Browse Source

Corrigindo imports e removendo código desnecessário

tags/0.1.5
Danimar Ribeiro 9 years ago
parent
commit
2b46639c48
  1. 60
      pytrustnfe/nfe/__init__.py
  2. 42
      pytrustnfe/nfe/assinatura.py
  3. 16
      pytrustnfe/nfe/comunicacao.py

60
pytrustnfe/nfe/__init__.py

@ -4,10 +4,8 @@
import os import os
from lxml import etree
from .comunicacao import executar_consulta, Comunicacao
from .assinatura import sign_xml, Assinatura
from pytrustnfe import utils
from .comunicacao import executar_consulta
from .assinatura import Assinatura
from pytrustnfe.xml import render_xml from pytrustnfe.xml import render_xml
from pytrustnfe.utils import CabecalhoSoap from pytrustnfe.utils import CabecalhoSoap
@ -31,81 +29,49 @@ def _send(certificado, method, **kwargs):
return executar_consulta(certificado, cabecalho, xml_signed) return executar_consulta(certificado, cabecalho, xml_signed)
def autorizar_nfe(certificado, **kwargs): # Assinar def autorizar_nfe(certificado, **kwargs): # Assinar
_send(certificado, 'NfeAutorizacao', **kwargs) _send(certificado, 'NfeAutorizacao', **kwargs)
def retorno_autorizar_nfe(certificado, **kwargs): def retorno_autorizar_nfe(certificado, **kwargs):
_send(certificado, 'NfeRetAutorizacao', **kwargs) _send(certificado, 'NfeRetAutorizacao', **kwargs)
def recepcao_evento_cancelamento(certificado, **kwargs): # Assinar def recepcao_evento_cancelamento(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoCancelamento', **kwargs) _send(certificado, 'RecepcaoEventoCancelamento', **kwargs)
def inutilizar_nfe(certificado, **kwargs): # Assinar def inutilizar_nfe(certificado, **kwargs): # Assinar
_send(certificado, 'NfeInutilizacao', **kwargs) _send(certificado, 'NfeInutilizacao', **kwargs)
def consultar_protocolo_nfe(certificado, **kwargs): def consultar_protocolo_nfe(certificado, **kwargs):
_send(certificado, 'NfeConsultaProtocolo', **kwargs) _send(certificado, 'NfeConsultaProtocolo', **kwargs)
def nfe_status_servico(certificado, **kwargs): def nfe_status_servico(certificado, **kwargs):
_send(certificado, 'NfeStatusServico.', **kwargs) _send(certificado, 'NfeStatusServico.', **kwargs)
def consulta_cadastro(certificado, **kwargs): def consulta_cadastro(certificado, **kwargs):
_send(certificado, 'NfeConsultaCadastro.', **kwargs) _send(certificado, 'NfeConsultaCadastro.', **kwargs)
def recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar def recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoCarta.', **kwargs) _send(certificado, 'RecepcaoEventoCarta.', **kwargs)
def recepcao_evento_manifesto(certificado, **kwargs): # Assinar def recepcao_evento_manifesto(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoManifesto', **kwargs) _send(certificado, 'RecepcaoEventoManifesto', **kwargs)
def recepcao_evento_epec(certificado, **kwargs): # Assinar def recepcao_evento_epec(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoEPEC', **kwargs) _send(certificado, 'RecepcaoEventoEPEC', **kwargs)
def consulta_nfe_destinada(certificado, **kwargs): def consulta_nfe_destinada(certificado, **kwargs):
_send(certificado, 'NfeConsultaDest', **kwargs) _send(certificado, 'NfeConsultaDest', **kwargs)
def download_nfe(certificado, **kwargs): def download_nfe(certificado, **kwargs):
_send(certificado, 'NfeDownloadNF', **kwargs) _send(certificado, 'NfeDownloadNF', **kwargs)
class NFe(Comunicacao):
def __init__(self, cert, key):
Comunicacao.__init__(self, cert, key)
def consultar_cadastro(self, cadastro, estado):
self.url = 'https://cad.sefazrs.rs.gov.br/ws/cadconsultacadastro/cadconsultacadastro2.asmx'
self.metodo = 'NfeConsultaCadastro'
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml')
xml = render_xml(path, 'consultar_cadastro.xml', **cadastro)
xml_response, obj = self._executar_consulta(xml)
return {
'sent_xml': xml,
'received_xml': xml_response,
'object': obj.Body.nfeAutorizacaoLoteResult
}
def autorizar_nfe(self, nfe, nfe_id):
self.url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx'
self.metodo = 'NfeAutorizacao/nfeAutorizacaoLote'
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml')
xml = render_xml(path, 'nfeEnv.xml', **nfe)
xmlElem = etree.fromstring(xml)
xml_signed = sign_xml(xmlElem, self.cert, self.key, '#%s' % nfe_id)
xml_response, obj = self._executar_consulta(xml_signed)
return {
'sent_xml': xml_signed,
'received_xml': xml_response,
'object': obj.Body.nfeAutorizacaoLoteResult
}

42
pytrustnfe/nfe/assinatura.py

@ -6,48 +6,9 @@ import xmlsec
import libxml2 import libxml2
import os.path import os.path
from signxml import XMLSigner
from signxml import methods
from lxml import etree
from OpenSSL import crypto
NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#' NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#'
def extract_cert_and_key_from_pfx(pfx, password):
pfx = crypto.load_pkcs12(pfx, password)
# PEM formatted private key
key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
pfx.get_privatekey())
# PEM formatted certificate
cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
pfx.get_certificate())
return cert, key
def recursively_empty(e):
if e.text:
return False
return all((recursively_empty(c) for c in e.iterchildren()))
def sign_xml(xml, cert, key):
parser = etree.XMLParser(remove_blank_text=True, remove_comments=True)
elem = etree.fromstring(xml, parser=parser)
signer = XMLSigner(
digest_algorithm=u'sha1', signature_algorithm="rsa-sha1",
method=methods.enveloping,
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
ns = {}
ns[None] = signer.namespaces['ds']
signer.namespaces = ns
signed_root = signer.sign(elem, key=str(key), cert=cert)
return etree.tostring(signed_root)
class Assinatura(object): class Assinatura(object):
def __init__(self, arquivo, senha): def __init__(self, arquivo, senha):
@ -80,7 +41,8 @@ class Assinatura(object):
doc_xml = libxml2.parseMemory( doc_xml = libxml2.parseMemory(
xml, len(xml)) xml, len(xml))
signNode = xmlsec.TmplSignature(doc_xml, xmlsec.transformInclC14NId(),
signNode = xmlsec.TmplSignature(doc_xml,
xmlsec.transformInclC14NId(),
xmlsec.transformRsaSha1Id(), None) xmlsec.transformRsaSha1Id(), None)
doc_xml.getRootElement().addChild(signNode) doc_xml.getRootElement().addChild(signNode)

16
pytrustnfe/nfe/comunicacao.py

@ -3,10 +3,9 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from lxml import objectify
from uuid import uuid4 from uuid import uuid4
from pytrustnfe.client import HttpClient from pytrustnfe.client import HttpClient
from pytrustnfe.Certificado import converte_pfx_pem
from pytrustnfe.certificado import save_cert_key
from ..xml import sanitize_response from ..xml import sanitize_response
@ -15,12 +14,13 @@ common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'}
soap_body_path = './soap:Envelope/soap:Body' soap_body_path = './soap:Envelope/soap:Body'
soap_fault_path = './soap:Envelope/soap:Body/soap:Fault' soap_fault_path = './soap:Envelope/soap:Body/soap:Fault'
def executar_consulta(cerficado, cabecalho, xmlEnviar):
cert_path, key_path = self._preparar_temp_pem()
client = HttpClient(self.url, cert_path, key_path)
soap_xml = self._soap_xml(xmlEnviar)
xml_retorno = client.post_xml(self.web_service, soap_xml)
def executar_consulta(cerficado, cabecalho, xmlEnviar):
cert_path, key_path = save_cert_key()
url = ''
web_service = ''
client = HttpClient(url, cert_path, key_path)
xml_retorno = client.post_xml(web_service, xmlEnviar)
return sanitize_response(xml_retorno) return sanitize_response(xml_retorno)
@ -63,9 +63,7 @@ class Comunicacao(object):
assert self.url != '', "Url servidor não configurada" assert self.url != '', "Url servidor não configurada"
assert self.metodo != '', "Método não configurado" assert self.metodo != '', "Método não configurado"
def _executar_consulta(self, xmlEnviar): def _executar_consulta(self, xmlEnviar):
self._validar_dados()
cert_path, key_path = self._preparar_temp_pem() cert_path, key_path = self._preparar_temp_pem()
client = HttpClient(self.url, cert_path, key_path) client = HttpClient(self.url, cert_path, key_path)

Loading…
Cancel
Save