Browse Source

Merge 265bd35392 into 9fb73479e9

pull/6/merge
Danimar Ribeiro 9 years ago
committed by GitHub
parent
commit
1544b11867
  1. 3
      .travis.yml
  2. 2
      pytrustnfe/client.py
  3. 2
      pytrustnfe/nfse/paulistana/__init__.py
  4. 56
      pytrustnfe/suds_requests.py
  5. 6
      pytrustnfe/test/test_assinatura.py
  6. 16
      pytrustnfe/test/test_certificado.py
  7. 11
      pytrustnfe/test/test_xml_serializacao.py
  8. 6
      pytrustnfe/xml/__init__.py
  9. 6
      requirements.txt

3
.travis.yml

@ -1,6 +1,9 @@
language: python
python:
- "2.7"
- "3.3"
- "3.4"
- "3.5"
virtual_env:
system_site_packages: true
install:

2
pytrustnfe/client.py

@ -4,7 +4,7 @@
import requests
import suds.client
import suds_requests
from . import suds_requests
def get_authenticated_client(base_url, cert, key):

2
pytrustnfe/nfse/paulistana/__init__.py

@ -48,7 +48,7 @@ def _send(certificado, method, **kwargs):
try:
response = getattr(client.service, method)(1, xml_send)
except suds.WebFault, e:
except suds.WebFault as e:
return {
'sent_xml': xml_send,
'received_xml': e.fault.faultstring,

56
pytrustnfe/suds_requests.py

@ -0,0 +1,56 @@
import functools
import requests
import suds.transport as transport
import traceback
from six import StringIO
__all__ = ['RequestsTransport']
def handle_errors(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except requests.HTTPError as e:
buf = StringIO(e.response.content)
raise transport.TransportError(
'Error in requests\n' + traceback.format_exc(),
e.response.status_code,
buf,
)
except requests.RequestException:
raise transport.TransportError(
'Error in requests\n' + traceback.format_exc(),
000,
)
return wrapper
class RequestsTransport(transport.Transport):
def __init__(self, session=None):
transport.Transport.__init__(self)
self._session = session or requests.Session()
@handle_errors
def open(self, request):
resp = self._session.get(request.url)
resp.raise_for_status()
return StringIO(resp.content)
@handle_errors
def send(self, request):
resp = self._session.post(
request.url,
data=request.message,
headers=request.headers,
)
if resp.headers.get('content-type') not in ('text/xml',
'application/soap+xml'):
resp.raise_for_status()
return transport.Reply(
resp.status_code,
resp.headers,
resp.content,
)

6
pytrustnfe/test/test_assinatura.py

@ -32,21 +32,21 @@ class test_assinatura(unittest.TestCase):
caminho = os.path.dirname(__file__)
def test_assinar_xml_senha_invalida(self):
pfx = open(os.path.join(self.caminho, 'teste.pfx')).read()
pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'rb').read()
signer = Assinatura(pfx, '123')
self.assertRaises(Exception, signer.assina_xml, signer,
etree.fromstring(XML_ASSINAR),
'NFe43150602261542000143550010000000761792265342')
def test_assinar_xml_invalido(self):
pfx = open(os.path.join(self.caminho, 'teste.pfx')).read()
pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'rb').read()
signer = Assinatura(pfx, '123456')
self.assertRaises(Exception, signer.assina_xml, signer,
etree.fromstring(XML_ERRADO),
'NFe43150602261542000143550010000000761792265342')
def test_assinar_xml_valido(self):
pfx = open(os.path.join(self.caminho, 'teste.pfx')).read()
pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'rb').read()
signer = Assinatura(pfx, '123456')
xml = signer.assina_xml(
etree.fromstring(XML_ASSINAR),

16
pytrustnfe/test/test_certificado.py

@ -11,7 +11,7 @@ from pytrustnfe.certificado import Certificado
from pytrustnfe.certificado import save_cert_key
from pytrustnfe.certificado import extract_cert_and_key_from_pfx
CHAVE = '-----BEGIN PRIVATE KEY-----\n' \
CHAVE = b'-----BEGIN PRIVATE KEY-----\n' \
'MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAJONRp6l1y2ojgv8\n' \
'tP3AOLW0vjWQqiPseBLM7YAxbzz5R7LYlWHC0ZJ4uIvd4Cvc6AuoNJoeuhzFcwHx\n' \
'PL0TcFuW+5up1ktUohwaJ+/zKrMODCKt0gvif302yqasMnwLh9mGZQIkLkHPOX8p\n' \
@ -28,7 +28,7 @@ CHAVE = '-----BEGIN PRIVATE KEY-----\n' \
'fMw/Bh2wC5kj\n'\
'-----END PRIVATE KEY-----\n'
CERTIFICADO = '-----BEGIN CERTIFICATE-----\n'\
CERTIFICADO = b'-----BEGIN CERTIFICATE-----\n'\
'MIICMTCCAZqgAwIBAgIQfYOsIEVuAJ1FwwcTrY0t1DANBgkqhkiG9w0BAQUFADBX\n'\
'MVUwUwYDVQQDHkwAewA1ADkARgAxAEUANAA2ADEALQBEAEQARQA1AC0ANABEADIA\n'\
'RgAtAEEAMAAxAEEALQA4ADMAMwAyADIAQQA5AEUAQgA4ADMAOAB9MB4XDTE1MDYx\n'\
@ -49,25 +49,25 @@ class test_assinatura(unittest.TestCase):
caminho = os.path.dirname(__file__)
def test_preparar_pfx(self):
dir_pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'r').read()
dir_pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'rb').read()
cert, key = extract_cert_and_key_from_pfx(dir_pfx, '123456')
self.assertEqual(key, CHAVE, 'Chave gerada inválida')
self.assertEqual(cert, CERTIFICADO, 'Certificado inválido')
def test_save_pfx(self):
pfx_source = open(os.path.join(self.caminho, 'teste.pfx'), 'r').read()
pfx_source = open(os.path.join(self.caminho, 'teste.pfx'), 'rb').read()
pfx = Certificado(pfx_source, '123')
path = pfx.save_pfx()
saved = open(path, 'r').read()
saved = open(path, 'rb').read()
self.assertEqual(pfx_source, saved,
'Arquivo pfx salvo não bate com arquivo lido')
def test_save_cert_and_key(self):
dir_pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'r').read()
dir_pfx = open(os.path.join(self.caminho, 'teste.pfx'), 'rb').read()
cert, key = extract_cert_and_key_from_pfx(dir_pfx, '123456')
cert_path, key_path = save_cert_key(cert, key)
cert_saved = open(cert_path, 'r').read()
key_saved = open(key_path, 'r').read()
cert_saved = open(cert_path, 'rb').read()
key_saved = open(key_path, 'rb').read()
self.assertEqual(
cert, cert_saved, 'Certificado não corresponde ao original')
self.assertEqual(key, key_saved, 'Chave não corresponde ao original')

11
pytrustnfe/test/test_xml_serializacao.py

@ -1,5 +1,6 @@
# coding=utf-8
import codecs
import os.path
import unittest
from lxml import etree
@ -15,19 +16,21 @@ class test_xml_serializacao(unittest.TestCase):
tag2='ola', tag3='comovai')
result = open(os.path.join(path, 'jinja_result.xml'), 'r').read()
self.assertEqual(xml + '\n', result)
self.assertEqual(xml + u'\n', result)
def test_serializacao_remove_empty(self):
path = os.path.join(os.path.dirname(__file__), 'XMLs')
xmlElem = render_xml(path, 'jinja_template.xml', True, tag1='oi',
tag2='ola', tag3='comovai')
xml = etree.tostring(xmlElem)
xml = etree.tostring(xmlElem, encoding="unicode")
result = open(os.path.join(path, 'jinja_remove_empty.xml'), 'r').read()
self.assertEqual(xml + '\n', result)
self.assertEqual(xml + u'\n', result)
def test_sanitize_response(self):
path = os.path.join(os.path.dirname(__file__), 'XMLs')
xml_to_clear = open(os.path.join(path, 'jinja_result.xml'), 'r').read()
f = codecs.open(os.path.join(path, 'jinja_result.xml'), encoding='utf-8')
xml_to_clear = f.read()
xml, obj = sanitize_response(xml_to_clear)
self.assertEqual(xml, xml_to_clear)

6
pytrustnfe/xml/__init__.py

@ -37,13 +37,11 @@ def render_xml(path, template_name, remove_empty, **nfe):
if recursively_empty(elem):
parent.remove(elem)
return root
return etree.tostring(root)
return etree.tostring(root, encoding="unicode")
def sanitize_response(response):
response = unicode(response)
response = unicodedata.normalize('NFKD', response).encode('ascii',
'ignore')
response = unicodedata.normalize('NFKD', response)
tree = etree.fromstring(response)
# Remove namespaces inuteis na resposta

6
requirements.txt

@ -3,11 +3,9 @@ lxml
nose
mock
coveralls
http://xmlsoft.org/sources/python/libxml2-python-2.6.21.tar.gz
https://github.com/odoo-brazil/pyxmlsec/archive/master.zip
Jinja2
signxml
pypandoc
suds
suds_requests
suds-jurko
reportlab
six
Loading…
Cancel
Save