Browse Source

Passando os parametros para o gerar soap xml

tags/0.1.5
Danimar Ribeiro 9 years ago
parent
commit
e080c25553
  1. 12
      pytrustnfe/Servidores.py
  2. 7
      pytrustnfe/client.py
  3. 41
      pytrustnfe/nfe/__init__.py
  4. 61
      pytrustnfe/nfe/comunicacao.py
  5. 2
      pytrustnfe/utils.py

12
pytrustnfe/Servidores.py

@ -54,8 +54,16 @@ SIGLA_ESTADO = {
def localizar_url(servico, estado, ambiente=2):
sigla = SIGLA_ESTADO[estado]
return ESTADO_WS[sigla][ambiente]['servidor'],
ESTADO_WS[sigla][ambiente][servico]
dominio = ESTADO_WS[sigla][ambiente]['servidor']
complemento = ESTADO_WS[sigla][ambiente][servico]
if sigla == 'RS' and servico == WS_NFE_CONSULTA_CADASTRO:
dominio = 'cad.sefazrs.rs.gov.br'
if sigla in ('AC', 'RN', 'PB', 'SC') and \
servico == WS_NFE_CONSULTA_CADASTRO:
dominio = 'cad.svrs.rs.gov.br'
return "https://%s/%s" % (dominio, complemento)
METODO_WS = {

7
pytrustnfe/client.py

@ -42,12 +42,13 @@ class HttpClient(object):
def _headers(self, action):
return {
u'Content-type': u'application/soap+xml; charset=utf-8; action="http://www.portalfiscal.inf.br/nfe/wsdl/%s' % action,
u'Content-type': u'application/soap+xml; charset=utf-8; action="http://www.portalfiscal.inf.br/nfe/wsdl/%s"' % action,
u'Accept': u'application/soap+xml; charset=utf-8'
}
def post_soap(self, xml_soap, action):
def post_soap(self, xml_soap, cabecalho):
header = self._headers(cabecalho.soap_action)
res = requests.post(self.url, data=xml_soap,
cert=(self.cert_path, self.key_path),
verify=False, headers=self._headers(action))
verify=False, headers=header)
return res.text

41
pytrustnfe/nfe/__init__.py

@ -10,11 +10,17 @@ from pytrustnfe.xml import render_xml
from pytrustnfe.utils import CabecalhoSoap
from pytrustnfe.utils import gerar_chave, ChaveNFe
from pytrustnfe.Servidores import localizar_url
import re
def _build_header(**kwargs):
vals = {'estado': kwargs['estado'], 'soap_action': ''}
def _build_header(method, **kwargs):
action = {
'NfeAutorizacao': ('NfeAutorizacao', '3.10'),
'NfeRetAutorizacao': ('NfeRetAutorizacao', '3.10'),
'NfeConsultaCadastro': ('CadConsultaCadastro2', '2.00'),
}
vals = {'estado': kwargs['estado'],
'soap_action': action[method][0],
'versao': action[method][1]}
return CabecalhoSoap(**vals)
@ -41,7 +47,6 @@ def _send(certificado, method, sign, **kwargs):
path = os.path.join(os.path.dirname(__file__), 'templates')
xml_send = render_xml(path, '%s.xml' % method, **kwargs)
if sign:
xml_send = '<!DOCTYPE NFe [<!ATTLIST infNFe Id ID #IMPLIED>]>' + \
xml_send
@ -55,8 +60,8 @@ def _send(certificado, method, sign, **kwargs):
print xml_send
xml_send = xml_send.replace('\n', '')
url = localizar_url(method, kwargs['estado'])
cabecalho = _build_header(**kwargs)
url = localizar_url(method, kwargs['estado'], kwargs['ambiente'])
cabecalho = _build_header(method, **kwargs)
response, obj = executar_consulta(certificado, url, cabecalho, xml_send)
return {
@ -68,48 +73,48 @@ def _send(certificado, method, sign, **kwargs):
def autorizar_nfe(certificado, **kwargs): # Assinar
_generate_nfe_id(**kwargs)
_send(certificado, 'NfeAutorizacao', True, **kwargs)
return _send(certificado, 'NfeAutorizacao', True, **kwargs)
def retorno_autorizar_nfe(certificado, **kwargs):
_send(certificado, 'NfeRetAutorizacao', False, **kwargs)
return _send(certificado, 'NfeRetAutorizacao', False, **kwargs)
def recepcao_evento_cancelamento(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoCancelamento', True, **kwargs)
return _send(certificado, 'RecepcaoEventoCancelamento', True, **kwargs)
def inutilizar_nfe(certificado, **kwargs): # Assinar
_send(certificado, 'NfeInutilizacao', True, **kwargs)
return _send(certificado, 'NfeInutilizacao', True, **kwargs)
def consultar_protocolo_nfe(certificado, **kwargs):
_send(certificado, 'NfeConsultaProtocolo', True, **kwargs)
return _send(certificado, 'NfeConsultaProtocolo', True, **kwargs)
def nfe_status_servico(certificado, **kwargs):
_send(certificado, 'NfeStatusServico', False, **kwargs)
return _send(certificado, 'NfeStatusServico', False, **kwargs)
def consulta_cadastro(certificado, **kwargs):
_send(certificado, 'NfeConsultaCadastro', False, **kwargs)
return _send(certificado, 'NfeConsultaCadastro', False, **kwargs)
def recepcao_evento_carta_correcao(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoCarta', True, **kwargs)
return _send(certificado, 'RecepcaoEventoCarta', True, **kwargs)
def recepcao_evento_manifesto(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoManifesto', True, **kwargs)
return _send(certificado, 'RecepcaoEventoManifesto', True, **kwargs)
def recepcao_evento_epec(certificado, **kwargs): # Assinar
_send(certificado, 'RecepcaoEventoEPEC', True, **kwargs)
return _send(certificado, 'RecepcaoEventoEPEC', True, **kwargs)
def consulta_nfe_destinada(certificado, **kwargs):
_send(certificado, 'NfeConsultaDest', False, **kwargs)
return _send(certificado, 'NfeConsultaDest', False, **kwargs)
def download_nfe(certificado, **kwargs):
_send(certificado, 'NfeDownloadNF', False, **kwargs)
return _send(certificado, 'NfeDownloadNF', False, **kwargs)

61
pytrustnfe/nfe/comunicacao.py

@ -15,62 +15,23 @@ soap_body_path = './soap:Envelope/soap:Body'
soap_fault_path = './soap:Envelope/soap:Body/soap:Fault'
def _soap_xml(body):
def _soap_xml(body, cabecalho):
xml = '<?xml version="1.0" encoding="utf-8"?>'
xml += '<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap12="http://www.w3.org/2003/05/soap-envelope"><soap12:Header>'
xml += '<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += '<cUF>43</cUF><versaoDados>3.10</versaoDados></nfeCabecMsg></soap12:Header><soap12:Body>'
xml += '<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += '<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope"><soap:Header>'
xml += '<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/'+ cabecalho.soap_action + '">'
xml += '<cUF>' + cabecalho.estado + '</cUF><versaoDados>' + cabecalho.versao + '</versaoDados></nfeCabecMsg></soap:Header><soap:Body>'
xml += '<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/' + cabecalho.soap_action + '">'
xml += body
xml += '</nfeDadosMsg></soap12:Body></soap12:Envelope>'
xml += '</nfeDadosMsg></soap:Body></soap:Envelope>'
return xml.rstrip('\n')
def executar_consulta(certificado, url, cabecalho, xmlEnviar):
cert, key = extract_cert_and_key_from_pfx(certificado.pfx, certificado.password)
cert, key = extract_cert_and_key_from_pfx(
certificado.pfx, certificado.password)
cert_path, key_path = save_cert_key(cert, key)
url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx'
web_service = 'NfeAutorizacao/nfeAutorizacaoLote'
client = HttpClient(url, cert_path, key_path)
xmlEnviar = xmlEnviar.replace('<?xml version="1.0"?>', '')
xml_enviar = _soap_xml(xmlEnviar)
xml_retorno = client.post_soap(xml_enviar, web_service)
return sanitize_response(xml_retorno)
class Comunicacao(object):
url = ''
web_service = ''
metodo = ''
tag_retorno = ''
def __init__(self, cert, key):
self.cert = cert
self.key = key
def _preparar_temp_pem(self):
cert_path = '/tmp/' + uuid4().hex
key_path = '/tmp/' + uuid4().hex
arq_temp = open(cert_path, 'w')
arq_temp.write(self.cert)
arq_temp.close()
arq_temp = open(key_path, 'w')
arq_temp.write(self.key)
arq_temp.close()
return cert_path, key_path
def _validar_dados(self):
assert self.url != '', "Url servidor não configurada"
assert self.metodo != '', "Método não configurado"
def _executar_consulta(self, xmlEnviar):
cert_path, key_path = self._preparar_temp_pem()
client = HttpClient(self.url, cert_path, key_path)
soap_xml = self._soap_xml(xmlEnviar)
xml_retorno = client.post_xml(self.web_service, soap_xml)
return sanitize_response(xml_retorno)
xml_enviar = _soap_xml(xmlEnviar, cabecalho)
xml_retorno = client.post_soap(xml_enviar, cabecalho)
return sanitize_response(xml_retorno)

2
pytrustnfe/utils.py

@ -5,9 +5,11 @@
from datetime import date, datetime
class CabecalhoSoap(object):
def __init__(self, **kwargs):
self.versao = kwargs.pop('versao', '')
self.estado = kwargs.pop('estado', '')
self.soap_action = kwargs.pop('soap_action', '')

Loading…
Cancel
Save