From 2b46639c48cf5b55bfec58b2a51073d006ee50d0 Mon Sep 17 00:00:00 2001 From: Danimar Ribeiro Date: Mon, 22 Aug 2016 13:44:08 -0300 Subject: [PATCH] =?UTF-8?q?Corrigindo=20imports=20e=20removendo=20c=C3=B3d?= =?UTF-8?q?igo=20desnecess=C3=A1rio?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytrustnfe/nfe/__init__.py | 70 +++++++++++-------------------------------- pytrustnfe/nfe/assinatura.py | 42 ++------------------------ pytrustnfe/nfe/comunicacao.py | 18 +++++------ 3 files changed, 28 insertions(+), 102 deletions(-) diff --git a/pytrustnfe/nfe/__init__.py b/pytrustnfe/nfe/__init__.py index 59787f2..21d5912 100644 --- a/pytrustnfe/nfe/__init__.py +++ b/pytrustnfe/nfe/__init__.py @@ -4,10 +4,8 @@ import os -from lxml import etree -from .comunicacao import executar_consulta, Comunicacao -from .assinatura import sign_xml, Assinatura -from pytrustnfe import utils +from .comunicacao import executar_consulta +from .assinatura import Assinatura from pytrustnfe.xml import render_xml from pytrustnfe.utils import CabecalhoSoap @@ -31,81 +29,49 @@ def _send(certificado, method, **kwargs): return executar_consulta(certificado, cabecalho, xml_signed) - -def autorizar_nfe(certificado, **kwargs): # Assinar +def autorizar_nfe(certificado, **kwargs): # Assinar _send(certificado, 'NfeAutorizacao', **kwargs) + def retorno_autorizar_nfe(certificado, **kwargs): _send(certificado, 'NfeRetAutorizacao', **kwargs) + def recepcao_evento_cancelamento(certificado, **kwargs): # Assinar _send(certificado, 'RecepcaoEventoCancelamento', **kwargs) -def inutilizar_nfe(certificado, **kwargs): # Assinar + +def inutilizar_nfe(certificado, **kwargs): # Assinar _send(certificado, 'NfeInutilizacao', **kwargs) + def consultar_protocolo_nfe(certificado, **kwargs): _send(certificado, 'NfeConsultaProtocolo', **kwargs) + def nfe_status_servico(certificado, **kwargs): _send(certificado, 'NfeStatusServico.', **kwargs) + def consulta_cadastro(certificado, **kwargs): _send(certificado, 'NfeConsultaCadastro.', **kwargs) -def recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar + +def recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar _send(certificado, 'RecepcaoEventoCarta.', **kwargs) -def recepcao_evento_manifesto(certificado, **kwargs): # Assinar + +def recepcao_evento_manifesto(certificado, **kwargs): # Assinar _send(certificado, 'RecepcaoEventoManifesto', **kwargs) -def recepcao_evento_epec(certificado, **kwargs): # Assinar + +def recepcao_evento_epec(certificado, **kwargs): # Assinar _send(certificado, 'RecepcaoEventoEPEC', **kwargs) + def consulta_nfe_destinada(certificado, **kwargs): _send(certificado, 'NfeConsultaDest', **kwargs) + def download_nfe(certificado, **kwargs): _send(certificado, 'NfeDownloadNF', **kwargs) - - -class NFe(Comunicacao): - - def __init__(self, cert, key): - Comunicacao.__init__(self, cert, key) - - - - def consultar_cadastro(self, cadastro, estado): - self.url = 'https://cad.sefazrs.rs.gov.br/ws/cadconsultacadastro/cadconsultacadastro2.asmx' - self.metodo = 'NfeConsultaCadastro' - - path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml') - xml = render_xml(path, 'consultar_cadastro.xml', **cadastro) - - xml_response, obj = self._executar_consulta(xml) - - return { - 'sent_xml': xml, - 'received_xml': xml_response, - 'object': obj.Body.nfeAutorizacaoLoteResult - } - - - def autorizar_nfe(self, nfe, nfe_id): - self.url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx' - self.metodo = 'NfeAutorizacao/nfeAutorizacaoLote' - - path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml') - xml = render_xml(path, 'nfeEnv.xml', **nfe) - - xmlElem = etree.fromstring(xml) - xml_signed = sign_xml(xmlElem, self.cert, self.key, '#%s' % nfe_id) - - xml_response, obj = self._executar_consulta(xml_signed) - - return { - 'sent_xml': xml_signed, - 'received_xml': xml_response, - 'object': obj.Body.nfeAutorizacaoLoteResult - } diff --git a/pytrustnfe/nfe/assinatura.py b/pytrustnfe/nfe/assinatura.py index eaf6b35..98b3a40 100644 --- a/pytrustnfe/nfe/assinatura.py +++ b/pytrustnfe/nfe/assinatura.py @@ -6,48 +6,9 @@ import xmlsec import libxml2 import os.path -from signxml import XMLSigner -from signxml import methods -from lxml import etree -from OpenSSL import crypto - NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#' -def extract_cert_and_key_from_pfx(pfx, password): - pfx = crypto.load_pkcs12(pfx, password) - # PEM formatted private key - key = crypto.dump_privatekey(crypto.FILETYPE_PEM, - pfx.get_privatekey()) - # PEM formatted certificate - cert = crypto.dump_certificate(crypto.FILETYPE_PEM, - pfx.get_certificate()) - return cert, key - - -def recursively_empty(e): - if e.text: - return False - return all((recursively_empty(c) for c in e.iterchildren())) - - -def sign_xml(xml, cert, key): - parser = etree.XMLParser(remove_blank_text=True, remove_comments=True) - elem = etree.fromstring(xml, parser=parser) - - signer = XMLSigner( - digest_algorithm=u'sha1', signature_algorithm="rsa-sha1", - method=methods.enveloping, - c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') - ns = {} - ns[None] = signer.namespaces['ds'] - signer.namespaces = ns - signed_root = signer.sign(elem, key=str(key), cert=cert) - - return etree.tostring(signed_root) - - - class Assinatura(object): def __init__(self, arquivo, senha): @@ -80,7 +41,8 @@ class Assinatura(object): doc_xml = libxml2.parseMemory( xml, len(xml)) - signNode = xmlsec.TmplSignature(doc_xml, xmlsec.transformInclC14NId(), + signNode = xmlsec.TmplSignature(doc_xml, + xmlsec.transformInclC14NId(), xmlsec.transformRsaSha1Id(), None) doc_xml.getRootElement().addChild(signNode) diff --git a/pytrustnfe/nfe/comunicacao.py b/pytrustnfe/nfe/comunicacao.py index c3beaae..8a076d0 100644 --- a/pytrustnfe/nfe/comunicacao.py +++ b/pytrustnfe/nfe/comunicacao.py @@ -3,10 +3,9 @@ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from lxml import objectify from uuid import uuid4 from pytrustnfe.client import HttpClient -from pytrustnfe.Certificado import converte_pfx_pem +from pytrustnfe.certificado import save_cert_key from ..xml import sanitize_response @@ -15,13 +14,14 @@ common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'} soap_body_path = './soap:Envelope/soap:Body' soap_fault_path = './soap:Envelope/soap:Body/soap:Fault' -def executar_consulta(cerficado, cabecalho, xmlEnviar): - cert_path, key_path = self._preparar_temp_pem() - client = HttpClient(self.url, cert_path, key_path) - soap_xml = self._soap_xml(xmlEnviar) - xml_retorno = client.post_xml(self.web_service, soap_xml) - +def executar_consulta(cerficado, cabecalho, xmlEnviar): + cert_path, key_path = save_cert_key() + url = '' + web_service = '' + client = HttpClient(url, cert_path, key_path) + xml_retorno = client.post_xml(web_service, xmlEnviar) + return sanitize_response(xml_retorno) @@ -63,9 +63,7 @@ class Comunicacao(object): assert self.url != '', "Url servidor não configurada" assert self.metodo != '', "Método não configurado" - def _executar_consulta(self, xmlEnviar): - self._validar_dados() cert_path, key_path = self._preparar_temp_pem() client = HttpClient(self.url, cert_path, key_path)