From 79a93393405e5617b36f27a4f9fa52460f9260ef Mon Sep 17 00:00:00 2001 From: Danimar Ribeiro Date: Sun, 8 May 2016 01:58:09 -0300 Subject: [PATCH] =?UTF-8?q?Refatorando=20c=C3=B3digo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytrustnfe/Certificado.py | 17 ++++---- pytrustnfe/HttpClient.py | 22 +++++----- pytrustnfe/Servidores.py | 18 ++++----- pytrustnfe/servicos/Assinatura.py | 66 ++++++++++++++++-------------- pytrustnfe/servicos/Comunicacao.py | 74 +++++++++++++++------------------- pytrustnfe/servicos/NFeAutorizacao.py | 48 ---------------------- pytrustnfe/servicos/nfe_autorizacao.py | 44 ++++++++++++++++++++ pytrustnfe/utils.py | 2 +- setup.py | 29 +++++++------ 9 files changed, 159 insertions(+), 161 deletions(-) delete mode 100644 pytrustnfe/servicos/NFeAutorizacao.py create mode 100644 pytrustnfe/servicos/nfe_autorizacao.py diff --git a/pytrustnfe/Certificado.py b/pytrustnfe/Certificado.py index 3d55fa2..1d37302 100644 --- a/pytrustnfe/Certificado.py +++ b/pytrustnfe/Certificado.py @@ -1,4 +1,4 @@ -#coding=utf-8 +# coding=utf-8 ''' Created on Jun 16, 2015 @@ -14,11 +14,14 @@ def converte_pfx_pem(caminho, senha): stream = open(caminho, 'rb').read() try: certificado = crypto.load_pkcs12(stream, senha) - - privada = crypto.dump_privatekey(crypto.FILETYPE_PEM, certificado.get_privatekey()) - certificado = crypto.dump_certificate(crypto.FILETYPE_PEM, certificado.get_certificate()) + + privada = crypto.dump_privatekey(crypto.FILETYPE_PEM, + certificado.get_privatekey()) + certificado = crypto.dump_certificate(crypto.FILETYPE_PEM, + certificado.get_certificate()) except Exception as e: - if len(e.message) == 1 and len(e.message[0])==3 and e.message[0][2] == 'mac verify failure': - raise Exception('Senha inválida') + if len(e.message) == 1 and len(e.message[0]) == 3 and \ + e.message[0][2] == 'mac verify failure': + raise Exception('Senha inválida') raise - return privada, certificado \ No newline at end of file + return privada, certificado diff --git a/pytrustnfe/HttpClient.py b/pytrustnfe/HttpClient.py index 61f8742..80f17eb 100644 --- a/pytrustnfe/HttpClient.py +++ b/pytrustnfe/HttpClient.py @@ -1,4 +1,4 @@ -#coding=utf-8 +# coding=utf-8 ''' Created on Jun 16, 2015 @@ -6,24 +6,25 @@ Created on Jun 16, 2015 ''' from httplib import HTTPSConnection + class HttpClient(object): - + def __init__(self, url, chave_pem, certificado_pem): self.url = url self.chave_pem = chave_pem self.certificado_pem = certificado_pem - + def _headers(self): - return { + return { u'Content-type': u'application/soap+xml; charset=utf-8', u'Accept': u'application/soap+xml; charset=utf-8' } - + def post_xml(self, post, xml): - - conexao = HTTPSConnection(self.url, '443', key_file=self.chave_pem, + + conexao = HTTPSConnection(self.url, '443', key_file=self.chave_pem, cert_file=self.certificado_pem) - + try: conexao.request(u'POST', post, xml, self._headers()) response = conexao.getresponse() @@ -31,9 +32,6 @@ class HttpClient(object): return response.read() return response.read() except Exception as e: - print str(e) + print(str(e)) finally: conexao.close() - - - \ No newline at end of file diff --git a/pytrustnfe/Servidores.py b/pytrustnfe/Servidores.py index d9f6117..5b2b763 100644 --- a/pytrustnfe/Servidores.py +++ b/pytrustnfe/Servidores.py @@ -9,33 +9,33 @@ def localizar_url(servico, estado): METODO_WS = { - WS_NFE_AUTORIZACAO: { + WS_NFE_AUTORIZACAO:{ 'webservice': 'NfeAutorizacao', - 'metodo' : 'NfeAutorizacao', + 'metodo': 'NfeAutorizacao', }, WS_NFE_CONSULTA_AUTORIZACAO: { 'webservice': 'NfeRetAutorizacao', - 'metodo' : 'NfeRetAutorizacao', + 'metodo': 'NfeRetAutorizacao', }, WS_NFE_INUTILIZACAO: { 'webservice': 'NfeInutilizacao2', - 'metodo' : 'nfeInutilizacaoNF2', + 'metodo': 'nfeInutilizacaoNF2', }, WS_NFE_CONSULTA: { 'webservice': 'NfeConsulta2', - 'metodo' : 'nfeConsultaNF2', + 'metodo': 'nfeConsultaNF2', }, WS_NFE_SITUACAO: { 'webservice': 'NfeStatusServico2', - 'metodo' : 'nfeStatusServicoNF2', + 'metodo': 'nfeStatusServicoNF2', }, WS_NFE_CONSULTA_CADASTRO: { 'webservice': 'CadConsultaCadastro2', - 'metodo' : 'consultaCadastro2', + 'metodo': 'consultaCadastro2', }, WS_NFE_RECEPCAO_EVENTO: { 'webservice': 'RecepcaoEvento', - 'metodo' : 'nfeRecepcaoEvento', + 'metodo': 'nfeRecepcaoEvento', }, WS_NFE_DOWNLOAD: { 'webservice': 'NfeDownloadNF', @@ -529,4 +529,4 @@ ESTADO_WS_CONTINGENCIA = { 'SE': SVC_AN, 'SP': SVC_AN, 'TO': SVC_AN, -} \ No newline at end of file +} diff --git a/pytrustnfe/servicos/Assinatura.py b/pytrustnfe/servicos/Assinatura.py index 41204df..92df75b 100644 --- a/pytrustnfe/servicos/Assinatura.py +++ b/pytrustnfe/servicos/Assinatura.py @@ -14,65 +14,71 @@ class Assinatura(object): def __init__(self, arquivo, senha): self.arquivo = arquivo self.senha = senha - - def _checar_certificado(self): + + def _checar_certificado(self): if not os.path.isfile(self.arquivo): raise Exception('Caminho do certificado não existe.') - + def _inicializar_cripto(self): libxml2.initParser() libxml2.substituteEntitiesDefault(1) - + xmlsec.init() xmlsec.cryptoAppInit(None) xmlsec.cryptoInit() - + def _finalizar_cripto(self): xmlsec.cryptoShutdown() xmlsec.cryptoAppShutdown() xmlsec.shutdown() - + libxml2.cleanupParser() - - + + def assina_xml(self, xml): self._checar_certificado() self._inicializar_cripto() try: - doc_xml = libxml2.parseMemory(xml.encode('utf-8'), len(xml.encode('utf-8'))) - - signNode = xmlsec.TmplSignature(doc_xml, xmlsec.transformInclC14NId(), - xmlsec.transformRsaSha1Id(), None) - - - doc_xml.getRootElement().addChild(signNode) - refNode = signNode.addReference(xmlsec.transformSha1Id(), - None, '#NFe43150602261542000143550010000000761792265342', None) - + doc_xml = libxml2.parseMemory(xml.encode('utf-8'), + len(xml.encode('utf-8'))) + + signNode = xmlsec.TmplSignature(doc_xml, + xmlsec.transformInclC14NId(), + xmlsec.transformRsaSha1Id(), None) + + doc_xml.getRootElement().addChild(signNode) + refNode = signNode.addReference( + xmlsec.transformSha1Id(), + None, '#NFe43150602261542000143550010000000761792265342', None) + refNode.addTransform(xmlsec.transformEnvelopedId()) refNode.addTransform(xmlsec.transformInclC14NId()) keyInfoNode = signNode.ensureKeyInfo() - keyInfoNode.addX509Data() - - dsig_ctx = xmlsec.DSigCtx() - chave = xmlsec.cryptoAppKeyLoad(filename=str(self.arquivo), format=xmlsec.KeyDataFormatPkcs12, - pwd=str(self.senha), pwdCallback=None, pwdCallbackCtx=None) - + keyInfoNode.addX509Data() + + dsig_ctx = xmlsec.DSigCtx() + chave = xmlsec.cryptoAppKeyLoad( + filename=str(self.arquivo), + format=xmlsec.KeyDataFormatPkcs12, + pwd=str(self.senha), pwdCallback=None, pwdCallbackCtx=None) + dsig_ctx.signKey = chave dsig_ctx.sign(signNode) - + status = dsig_ctx.status dsig_ctx.destroy() if status != xmlsec.DSigStatusSucceeded: - raise RuntimeError('Erro ao realizar a assinatura do arquivo; status: "' + str(status) + '"') + raise RuntimeError( + 'Erro ao realizar a assinatura do arquivo; status: "' + + str(status) + '"') xpath = doc_xml.xpathNewContext() xpath.xpathRegisterNs('sig', NAMESPACE_SIG) - certificados = xpath.xpathEval('//sig:X509Data/sig:X509Certificate') - for i in range(len(certificados)-1): - certificados[i].unlinkNode() - certificados[i].freeNode() + certs = xpath.xpathEval('//sig:X509Data/sig:X509Certificate') + for i in range(len(certs)-1): + certs[i].unlinkNode() + certs[i].freeNode() xml = doc_xml.serialize() return xml diff --git a/pytrustnfe/servicos/Comunicacao.py b/pytrustnfe/servicos/Comunicacao.py index e82879b..d2745c9 100644 --- a/pytrustnfe/servicos/Comunicacao.py +++ b/pytrustnfe/servicos/Comunicacao.py @@ -1,4 +1,4 @@ -#coding=utf-8 +# coding=utf-8 ''' Created on Jun 14, 2015 @@ -7,16 +7,13 @@ Created on Jun 14, 2015 from lxml import objectify from uuid import uuid4 -import xml.etree.ElementTree as ET -from xml.etree.ElementTree import tostring +from pytrustnfe.xml.DynamicXml import DynamicXml from pytrustnfe.HttpClient import HttpClient from pytrustnfe.Certificado import converte_pfx_pem from xml.dom.minidom import parseString -from pytrustnfe.Strings import CONSULTA_CADASTRO_COMPLETA - -common_namespaces = { 'soap': 'http://www.w3.org/2003/05/soap-envelope' } +common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'} soap_body_path = './soap:Envelope/soap:Body' soap_fault_path = './soap:Envelope/soap:Body/soap:Fault' @@ -24,43 +21,43 @@ soap_fault_path = './soap:Envelope/soap:Body/soap:Fault' class Comunicacao(object): url = '' - web_service = '' + web_service = '' metodo = '' - tag_retorno = '' - + tag_retorno = '' + def __init__(self, certificado, senha): self.certificado = certificado - self.senha = senha - + self.senha = senha + def _soap_xml(self, body): - return ''\ - ''\ - ''\ - ''\ - '422.00'\ - ''\ - ''\ - ''\ - ''\ - + body + ''\ - ''\ - '' - + xml = ''' + + +422.00 + + + +' + body + xml += '' + def _preparar_temp_pem(self): chave_temp = '/tmp/' + uuid4().hex certificado_temp = '/tmp/' + uuid4().hex - + chave, certificado = converte_pfx_pem(self.certificado, self.senha) arq_temp = open(chave_temp, 'w') arq_temp.write(chave) arq_temp.close() - + arq_temp = open(certificado_temp, 'w') arq_temp.write(certificado) arq_temp.close() - + return chave_temp, certificado_temp - + def _validar_dados(self): assert self.url != '', "Url servidor não configurada" assert self.web_service != '', "Web service não especificado" @@ -68,7 +65,7 @@ class Comunicacao(object): assert self.senha != '', "Senha não configurada" assert self.metodo != '', "Método não configurado" assert self.tag_retorno != '', "Tag de retorno não configurado" - + def _validar_xml(self, obj): xml = None if isinstance(obj, DynamicXml): @@ -77,28 +74,23 @@ class Comunicacao(object): xml = obj assert xml is not None, "Objeto deve ser do tipo DynamicXml ou string" return xml - + def _executar_consulta(self, xmlEnviar): self._validar_dados() chave, certificado = self._preparar_temp_pem() - + client = HttpClient(self.url, chave, certificado) soap_xml = self._soap_xml(xmlEnviar) xml_retorno = client.post_xml(self.web_service, soap_xml) - dom = parseString(xml_retorno) - nodes = dom.getElementsByTagNameNS(common_namespaces['soap'],'Fault') - if len(nodes) > 0: + nodes = dom.getElementsByTagNameNS(common_namespaces['soap'], 'Fault') + if len(nodes) > 0: return nodes[0].toxml(), None - - nodes = dom.getElementsByTagName(self.tag_retorno) + + nodes = dom.getElementsByTagName(self.tag_retorno) if len(nodes) > 0: obj = objectify.fromstring(nodes[0].toxml()) return nodes[0].toxml(), obj - + return xml_retorno, objectify.fromstring(xml_retorno) - - - - \ No newline at end of file diff --git a/pytrustnfe/servicos/NFeAutorizacao.py b/pytrustnfe/servicos/NFeAutorizacao.py deleted file mode 100644 index 25a25b4..0000000 --- a/pytrustnfe/servicos/NFeAutorizacao.py +++ /dev/null @@ -1,48 +0,0 @@ -#coding=utf-8 -''' -Created on 21/06/2015 - -@author: danimar -''' -from pytrustnfe.servicos.Comunicacao import Comunicacao -from pytrustnfe.xml import DynamicXml -from pytrustnfe import utils - - -class NfeAutorizacao(Comunicacao): - - def __init__(self, certificado, senha): - Comunicacao.__init__(self, certificado, senha) - - def autorizar_nfe(self, nfe): - xml = self._validar_xml(nfe) - - self.metodo = 'NFeAutorizacao' - self.tag_retorno = 'retEnviNFe' - self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' - self.url = 'nfe.sefazrs.rs.gov.br' - - return self._executar_consulta(xml) - - def autorizar_nfe_e_recibo(self, nfe): - xml = self._validar_xml(nfe) - - self.metodo = 'NFeAutorizacao' - self.tag_retorno = 'retEnviNFe' - self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' - self.url = 'nfe.sefazrs.rs.gov.br' - - xml_recibo, recibo = self._executar_consulta(xml) - - consulta_recibo = utils.gerar_consulta_recibo(recibo) - xml = self._validar_xml(nfe) - - self.metodo = 'NFeRetAutorizacao' - self.tag_retorno = 'retConsReciNFe' - self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' - self.url = 'nfe.sefazrs.rs.gov.br' - - return self._executar_consulta(xml) - - - \ No newline at end of file diff --git a/pytrustnfe/servicos/nfe_autorizacao.py b/pytrustnfe/servicos/nfe_autorizacao.py new file mode 100644 index 0000000..2e6aed4 --- /dev/null +++ b/pytrustnfe/servicos/nfe_autorizacao.py @@ -0,0 +1,44 @@ +# coding=utf-8 +''' +Created on 21/06/2015 + +@author: danimar +''' +from pytrustnfe.servicos.Comunicacao import Comunicacao +from pytrustnfe import utils + + +class NfeAutorizacao(Comunicacao): + + def __init__(self, certificado, senha): + Comunicacao.__init__(self, certificado, senha) + + def autorizar_nfe(self, nfe): + xml = self._validar_xml(nfe) + + self.metodo = 'NFeAutorizacao' + self.tag_retorno = 'retEnviNFe' + self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' + self.url = 'nfe.sefazrs.rs.gov.br' + + return self._executar_consulta(xml) + + def autorizar_nfe_e_recibo(self, nfe): + xml = self._validar_xml(nfe) + + self.metodo = 'NFeAutorizacao' + self.tag_retorno = 'retEnviNFe' + self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' + self.url = 'nfe.sefazrs.rs.gov.br' + + xml_recibo, recibo = self._executar_consulta(xml) + + consulta_recibo = utils.gerar_consulta_recibo(recibo) + xml = self._validar_xml(nfe) + + self.metodo = 'NFeRetAutorizacao' + self.tag_retorno = 'retConsReciNFe' + self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' + self.url = 'nfe.sefazrs.rs.gov.br' + + return self._executar_consulta(xml), consulta_recibo diff --git a/pytrustnfe/utils.py b/pytrustnfe/utils.py index b142c2f..8adafdd 100644 --- a/pytrustnfe/utils.py +++ b/pytrustnfe/utils.py @@ -19,7 +19,7 @@ def datetime_tostring(data): return data.strftime("%d-%m-%y %H:%M:%S") -def gerar_consulta_recibo(recibo): +def gerar_consulta_recibo(recibo): c = DynamicXml('consReciNFe') c(xmlns="http://www.portalfiscal.inf.br/nfe", versao="2.00") c.tpAmb = recibo.tpAmb diff --git a/setup.py b/setup.py index 34537cb..3263d3e 100644 --- a/setup.py +++ b/setup.py @@ -1,28 +1,31 @@ -#coding=utf-8 +# coding=utf-8 from setuptools import setup, find_packages setup( - name = "PyNfeTrust", - version = "0.1", - author = "Danimar Ribeiro", - author_email = 'danimaribeiro@gmail.com', - keywords = ['nfe', 'mdf-e'], + name="PyNfeTrust", + version="0.1", + author="Danimar Ribeiro", + author_email='danimaribeiro@gmail.com', + keywords=['nfe', 'mdf-e'], classifiers=[ 'Development Status :: 1 - alpha', 'Environment :: Plugins', 'Intended Audience :: Developers', - 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)', + 'License :: OSI Approved :: GNU Lesser General Public License v2 or \ + later (LGPLv2+)', 'Operating System :: OS Independent', 'Programming Language :: Python', 'Topic :: Software Development :: Libraries :: Python Modules', ], - packages = find_packages(exclude=['*test*']), - url = 'https://github.com/danimaribeiro/PyNfeTrust', - license = 'LGPL-v2.1+', - description = 'PyNfeTrust é uma biblioteca para envio de NF-e', - long_description = 'PyNfeTrust', + packages=find_packages(exclude=['*test*']), + url='https://github.com/danimaribeiro/PyNfeTrust', + license='LGPL-v2.1+', + description='PyNfeTrust é uma biblioteca para envio de NF-e', + long_description='PyNfeTrust', install_requires=[ - 'PyXMLSec >= 0.3.0' + 'PyXMLSec >= 0.3.0', + 'Jinja2 >= 2.8', + 'signxml >= 1.0.0', ], test_suite='nose.collector', tests_require=[