Danimar Ribeiro 9 years ago
parent
commit
fcf1942308
  1. 5
      pytrustnfe/Servidores.py
  2. 6
      pytrustnfe/client.py
  3. 23
      pytrustnfe/nfe/__init__.py
  4. 52
      pytrustnfe/nfe/assinatura.py
  5. 26
      pytrustnfe/nfe/comunicacao.py
  6. 24
      pytrustnfe/nfe/templates/NfeAutorizacao.xml
  7. 4
      pytrustnfe/utils.py

5
pytrustnfe/Servidores.py

@ -52,9 +52,10 @@ SIGLA_ESTADO = {
}
def localizar_url(servico, estado):
def localizar_url(servico, estado, ambiente=2):
sigla = SIGLA_ESTADO[estado]
return ESTADO_WS[sigla]['servidor'], ESTADO_WS[sigla][servico]
return ESTADO_WS[sigla][ambiente]['servidor'],
ESTADO_WS[sigla][ambiente][servico]
METODO_WS = {

6
pytrustnfe/client.py

@ -43,11 +43,11 @@ class HttpClient(object):
def _headers(self, action):
return {
u'Content-type': u'application/soap+xml; charset=utf-8; action="http://www.portalfiscal.inf.br/nfe/wsdl/%s' % action,
u'Accept': u'application/soap+xml; charset=utf-8'
u'Accept': u'application/soap+xml; charset=utf-8'
}
def post_soap(self, xml_soap, action):
res = requests.post(self.url, data=xml_soap,
res = requests.post(self.url, data=xml_soap,
cert=(self.cert_path, self.key_path),
verify=False, headers=self._headers(action))
return res.text
return res.text

23
pytrustnfe/nfe/__init__.py

@ -10,6 +10,7 @@ from pytrustnfe.xml import render_xml
from pytrustnfe.utils import CabecalhoSoap
from pytrustnfe.utils import gerar_chave, ChaveNFe
from pytrustnfe.Servidores import localizar_url
import re
def _build_header(**kwargs):
@ -30,8 +31,10 @@ def _generate_nfe_id(**kwargs):
'tipo': item['infNFe']['ide']['tpEmis'],
'codigo': item['infNFe']['ide']['cNF'],
}
chave = ChaveNFe(**vals)
item['infNFe']['Id'] = gerar_chave(chave, 'NFe')
chave_nfe = ChaveNFe(**vals)
chave_nfe = gerar_chave(chave_nfe, 'NFe')
item['infNFe']['Id'] = chave_nfe
item['infNFe']['ide']['cDV'] = chave_nfe[len(chave_nfe) - 1:]
def _send(certificado, method, sign, **kwargs):
@ -40,15 +43,27 @@ def _send(certificado, method, sign, **kwargs):
xml_send = render_xml(path, '%s.xml' % method, **kwargs)
if sign:
xml_send = '<!DOCTYPE NFe [<!ATTLIST infNFe Id ID #IMPLIED>]>' + \
xml_send
xml_send = xml_send.replace('\n', '')
pfx_path = certificado.save_pfx()
signer = Assinatura(pfx_path, certificado.password)
xml_send = signer.assina_xml(
xml_send = signer.assina_xml_nota(
xml_send, kwargs['NFes'][0]['infNFe']['Id'])
xml_send = xml_send.replace(
'\n<!DOCTYPE NFe [\n<!ATTLIST infNFe Id ID #IMPLIED>\n]>\n', '')
print xml_send
xml_send = xml_send.replace('\n', '')
url = localizar_url(method, kwargs['estado'])
cabecalho = _build_header(**kwargs)
return executar_consulta(certificado, url, cabecalho, xml_send)
response, obj = executar_consulta(certificado, url, cabecalho, xml_send)
return {
'sent_xml': xml_send,
'received_xml': response,
'object': obj
}
def autorizar_nfe(certificado, **kwargs): # Assinar

52
pytrustnfe/nfe/assinatura.py

@ -38,7 +38,6 @@ class Assinatura(object):
self._checar_certificado()
self._inicializar_cripto()
try:
xml = '<!DOCTYPE NFe [<!ATTLIST infNFe Id ID #IMPLIED>]>' + xml
doc_xml = libxml2.parseMemory(
xml, len(xml))
@ -87,3 +86,54 @@ class Assinatura(object):
finally:
doc_xml.freeDoc()
# self._finalizar_cripto()
def assina_xml_nota(self, xml, reference):
self._checar_certificado()
self._inicializar_cripto()
try:
doc_xml = libxml2.parseMemory(
xml, len(xml))
signNode = xmlsec.TmplSignature(doc_xml,
xmlsec.transformInclC14NId(),
xmlsec.transformRsaSha1Id(), None)
doc_xml.getRootElement().get_last().addChild(signNode)
refNode = signNode.addReference(xmlsec.transformSha1Id(),
None, '#' + str(reference), None)
refNode.addTransform(xmlsec.transformEnvelopedId())
refNode.addTransform(xmlsec.transformInclC14NId())
keyInfoNode = signNode.ensureKeyInfo()
keyInfoNode.addX509Data()
dsig_ctx = xmlsec.DSigCtx()
chave = xmlsec.cryptoAppKeyLoad(filename=str(self.arquivo),
format=xmlsec.KeyDataFormatPkcs12,
pwd=str(self.senha),
pwdCallback=None,
pwdCallbackCtx=None)
dsig_ctx.signKey = chave
dsig_ctx.sign(signNode)
status = dsig_ctx.status
dsig_ctx.destroy()
if status != xmlsec.DSigStatusSucceeded:
raise RuntimeError(
'Erro ao realizar a assinatura do arquivo; status: "' +
str(status) +
'"')
xpath = doc_xml.xpathNewContext()
xpath.xpathRegisterNs('sig', NAMESPACE_SIG)
certificados = xpath.xpathEval(
'//sig:X509Data/sig:X509Certificate')
for i in range(len(certificados) - 1):
certificados[i].unlinkNode()
certificados[i].freeNode()
xml = doc_xml.serialize()
return xml
finally:
doc_xml.freeDoc()
# self._finalizar_cripto()

26
pytrustnfe/nfe/comunicacao.py

@ -15,14 +15,26 @@ soap_body_path = './soap:Envelope/soap:Body'
soap_fault_path = './soap:Envelope/soap:Body/soap:Fault'
def _soap_xml(body):
xml = '<?xml version="1.0" encoding="utf-8"?>'
xml += '<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap12="http://www.w3.org/2003/05/soap-envelope"><soap12:Header>'
xml += '<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += '<cUF>43</cUF><versaoDados>3.10</versaoDados></nfeCabecMsg></soap12:Header><soap12:Body>'
xml += '<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += body
xml += '</nfeDadosMsg></soap12:Body></soap12:Envelope>'
return xml.rstrip('\n')
def executar_consulta(certificado, url, cabecalho, xmlEnviar):
cert, key = extract_cert_and_key_from_pfx(certificado.pfx, certificado.password)
cert_path, key_path = save_cert_key(cert, key)
url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx'
web_service = 'NfeAutorizacao/nfeAutorizacaoLote'
client = HttpClient(url, cert_path, key_path)
xml_retorno = client.post_soap(xmlEnviar, web_service)
xmlEnviar = xmlEnviar.replace('<?xml version="1.0"?>', '')
xml_enviar = _soap_xml(xmlEnviar)
xml_retorno = client.post_soap(xml_enviar, web_service)
return sanitize_response(xml_retorno)
@ -36,16 +48,6 @@ class Comunicacao(object):
self.cert = cert
self.key = key
def _soap_xml(self, body):
xml = '<?xml version="1.0" encoding="utf-8"?>'
xml += '<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap12="http://www.w3.org/2003/05/soap-envelope"><soap12:Header>'
xml += '<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += '<cUF>43</cUF><versaoDados>3.10</versaoDados></nfeCabecMsg></soap12:Header><soap12:Body>'
xml += '<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += body
xml += '</nfeDadosMsg></soap12:Body></soap12:Envelope>'
return xml.rstrip('\n')
def _preparar_temp_pem(self):
cert_path = '/tmp/' + uuid4().hex
key_path = '/tmp/' + uuid4().hex

24
pytrustnfe/nfe/templates/NfeAutorizacao.xml

@ -62,7 +62,7 @@
{% endif %}
{% if dest.tipo == 'company' -%}
<CNPJ>{{ dest.cnpj_cpf }}</CNPJ>
{% endif %}
{% endif %}
<xNome>{{ dest.xNome }}</xNome>
<enderDest>
<xLgr>{{ dest.enderDest.xLgr }}</xLgr>
@ -104,14 +104,24 @@
{% with imposto = det.imposto %}
<vTotTrib>{{ imposto.vTotTrib }}</vTotTrib>
<ICMS>
{% if imposto.ICMS.CST == '00' -%}
<ICMS00>
<orig>{{ imposto.ICMS.ICMS00.orig }}</orig>
<CST>{{ imposto.ICMS.ICMS00.CST }}</CST>
<modBC>{{ imposto.ICMS.ICMS00.modBC }}</modBC>
<vBC>{{ imposto.ICMS.ICMS00.vBC }}</vBC>
<pICMS>{{ imposto.ICMS.ICMS00.pICMS }}</pICMS>
<vICMS>{{ imposto.ICMS.ICMS00.vICMS }}</vICMS>
<orig>{{ imposto.ICMS.orig }}</orig>
<CST>{{ imposto.ICMS.CST }}</CST>
<modBC>{{ imposto.ICMS.modBC }}</modBC>
<vBC>{{ imposto.ICMS.vBC }}</vBC>
<pICMS>{{ imposto.ICMS.pICMS }}</pICMS>
<vICMS>{{ imposto.ICMS.vICMS }}</vICMS>
</ICMS00>
{% endif %}
{% if imposto.ICMS.CST == '101' -%}
<ICMSSN101>
<orig>{{ imposto.ICMS.orig }}</orig>
<CSOSN>{{ imposto.ICMS.CST }}</CSOSN>
<pCredSN>{{ imposto.ICMS.pCredSN }}</pCredSN>
<vCredICMSSN>{{ imposto.ICMS.vCredICMSSN }}</vCredICMSSN>
</ICMSSN101>
{% endif %}
</ICMS>
<IPI>
<cEnq>{{ imposto.IPI.cEnq }}</cEnq>

4
pytrustnfe/utils.py

@ -6,7 +6,7 @@
from datetime import date, datetime
class CabecalhoSoap(object):
def __init__(self, **kwargs):
self.estado = kwargs.pop('estado', '')
self.soap_action = kwargs.pop('soap_action', '')
@ -53,7 +53,7 @@ def gerar_chave(obj_chave, prefix=None):
assert isinstance(obj_chave, ChaveNFe), "Objeto deve ser do tipo ChaveNFe"
obj_chave.validar()
chave_parcial = "%s%s%s%s%s%09d%d%08d" % (obj_chave.estado, obj_chave.emissao,
chave_parcial = "%s%s%s%s%s%09d%d%s" % (obj_chave.estado, obj_chave.emissao,
obj_chave.cnpj, obj_chave.modelo,
obj_chave.serie.zfill(3), obj_chave.numero,
obj_chave.tipo, obj_chave.codigo)

Loading…
Cancel
Save