Browse Source

tive um failed merge aqui

tags/0.1
Italo Maia 16 years ago
parent
commit
06633e87e9
  1. 3
      README
  2. 2
      pynfe/entidades/base.py
  3. 35
      pynfe/entidades/certificado.py
  4. 4
      pynfe/processamento/__init__.py
  5. 92
      pynfe/processamento/assinatura.py
  6. 113
      pynfe/processamento/comunicacao.py
  7. 29
      pynfe/processamento/serializacao.py
  8. 26
      pynfe/utils/__init__.py
  9. 6
      pynfe/utils/flags.py
  10. 11
      run_fake_soap_server.py
  11. 18
      tests/01-basico.txt
  12. 4
      tests/03-processamento-01-serializacao-xml.txt
  13. 6
      tests/03-processamento-03-assinatura.txt
  14. 23
      tests/03-processamento-04-comunicacao.txt
  15. 12
      tests/04-servidor-soap.txt
  16. BIN
      tests/certificado.pfx

3
README

@ -83,3 +83,6 @@ Referências
- Lista de codigos para campo EX TIPI
- http://www.fisconet.com.br/ipi/tipi/04.htm
- Certificado para testes
- http://nf-eletronica.com/blog/?p=133

2
pynfe/entidades/base.py

@ -11,7 +11,7 @@ class Entidade(object):
# Adiciona o objeto à fonte de dados informada
if not self._fonte_dados:
from fontes_dados import _fonte_dados
from fonte_dados import _fonte_dados
self._fonte_dados = _fonte_dados
self._fonte_dados.adicionar_objeto(self)

35
pynfe/entidades/certificado.py

@ -1,6 +1,10 @@
# -*- coding; utf-8 -*-
# -*- coding: utf-8 -*-
import os
from base import Entidade
from OpenSSL import crypto
class Certificado(Entidade):
u"""Classe abstrata responsavel por definir o modelo padrao para as demais
classes de certificados digitais.
@ -15,13 +19,40 @@ class Certificado(Entidade):
return super(Certificado, cls).__new__(cls, *args, **kwargs)
class CertificadoA1(Certificado):
"""Implementa a entidade do certificado eCNPJ A1, suportado pelo OpenSSL,
u"""Implementa a entidade do certificado eCNPJ A1, suportado pelo OpenSSL,
e amplamente utilizado."""
caminho_arquivo = None
conteudo_x509 = None
pasta_temporaria = '/tmp/'
arquivo_chave = 'key.pem'
arquivo_cert = 'cert.pem'
def __init__(self, caminho_arquivo=None, conteudo_x509=None):
self.caminho_arquivo = caminho_arquivo or self.caminho_arquivo
self.conteudo_x509 = conteudo_x509 or self.conteudo_x509
def separar_arquivo(self, senha, caminho_chave=None, caminho_cert=None):
u"""Separa o arquivo de certificado em dois: de chave e de certificado,
em arquivos temporários separados"""
caminho_chave = caminho_chave or os.path.join(self.pasta_temporaria, self.arquivo_chave)
caminho_cert = caminho_cert or os.path.join(self.pasta_temporaria, self.arquivo_cert)
# Lendo o arquivo pfx no formato pkcs12 como binario
pkcs12 = crypto.load_pkcs12(file(self.caminho_arquivo, 'rb').read(), senha)
# Retorna a string decodificado da chave privada
key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkcs12.get_privatekey())
# Retorna a string decodificado do certificado
cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, pkcs12.get_certificate())
# Gravando a string no dicso
file(caminho_cert, 'wb').write(cert_str)
# Gravando a string no dicso
file(caminho_chave, 'wb').write(key_str)
return caminho_chave, caminho_cert

4
pynfe/processamento/__init__.py

@ -1,6 +1,6 @@
from interfaces import InterfaceXML
from serializacao import SerializacaoXML
from validacao import Validacao
from assinatura import AssinaturaA1
from comunicacao import Comunicacao
from comunicacao import ComunicacaoSefaz
from danfe import DANFE

92
pynfe/processamento/assinatura.py

@ -1,37 +1,11 @@
# -*- coding: utf-8 -*-
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
try:
from lxml import etree
except ImportError:
try:
# Python 2.5 - cElementTree
import xml.etree.cElementTree as etree
except ImportError:
try:
# Python 2.5 - ElementTree
import xml.etree.ElementTree as etree
except ImportError:
try:
# Instalacao normal do cElementTree
import cElementTree as etree
except ImportError:
try:
# Instalacao normal do ElementTree
import elementtree.ElementTree as etree
except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree')
import xmlsec, libxml2 # FIXME: verificar ambiguidade de dependencias: lxml e libxml2
from geraldo.utils import memoize
NAMESPACE_NFE = u'http://www.portalfiscal.inf.br/nfe'
NAMESPACE_SIG = u'http://www.w3.org/2000/09/xmldsig#'
from pynfe.utils import etree, StringIO
from pynfe.utils.flags import NAMESPACE_NFE, NAMESPACE_SIG
class Assinatura(object):
"""Classe abstrata responsavel por definir os metodos e logica das classes
@ -54,11 +28,9 @@ class Assinatura(object):
def assinar_etree(self, raiz):
u"""Efetua a assinatura numa instancia da biblioteca lxml.etree.
Este metodo de assinatura será utilizado internamente pelos demais,
sendo que eles convertem para uma instancia lxml.etree para somente
depois efetivar a assinatura.
TODO: Verificar o funcionamento da PyXMLSec antes de efetivar isso."""
pass
@ -87,16 +59,28 @@ class AssinaturaA1(Assinatura):
"""Classe abstrata responsavel por efetuar a assinatura do certificado
digital no XML informado."""
def assinar_arquivo(self, caminho_arquivo):
def assinar_arquivo(self, caminho_arquivo, salva=True):
# Carrega o XML do arquivo
raiz = etree.parse(caminho_arquivo)
return self.assinar_etree(raiz)
# Efetua a assinatura
xml = self.assinar_etree(raiz, retorna_xml=True)
# Grava XML assinado no arquivo
if salva:
fp = file(caminho_arquivo, 'w')
fp.write(xml)
fp.close()
return xml
def assinar_xml(self, xml):
raiz = etree.parse(StringIO(xml))
return self.assinar_etree(raiz)
def assinar_etree(self, raiz):
# Efetua a assinatura
return self.assinar_etree(raiz, retorna_xml=True)
def assinar_etree(self, raiz, retorna_xml=False):
# Extrai a tag do elemento raiz
tipo = extrair_tag(raiz.getroot())
@ -113,11 +97,28 @@ class AssinaturaA1(Assinatura):
# Tag de assinatura
if raiz.getroot().find('Signature') is None:
signature = etree.Element(
'Signature',
'{%s}Signature'%NAMESPACE_SIG,
URI=raiz.getroot().getchildren()[0].attrib['Id'],
xmlns=NAMESPACE_SIG,
nsmap={'sig': NAMESPACE_SIG},
)
signature.text = ''
signed_info = etree.SubElement(signature, '{%s}SignedInfo'%NAMESPACE_SIG)
etree.SubElement(signed_info, 'CanonicalizationMethod', Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315")
etree.SubElement(signed_info, 'SignatureMethod', Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1")
reference = etree.SubElement(signed_info, '{%s}Reference'%NAMESPACE_SIG, URI=raiz.getroot().getchildren()[0].attrib['Id'])
transforms = etree.SubElement(reference, 'Transforms', URI=raiz.getroot().getchildren()[0].attrib['Id'])
etree.SubElement(transforms, 'Transform', Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature")
etree.SubElement(transforms, 'Transform', Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315")
etree.SubElement(reference, '{%s}DigestMethod'%NAMESPACE_SIG, Algorithm="http://www.w3.org/2000/09/xmldsig#sha1")
digest_value = etree.SubElement(reference, '{%s}DigestValue'%NAMESPACE_SIG)
signature_value = etree.SubElement(signature, '{%s}SignatureValue'%NAMESPACE_SIG)
key_info = etree.SubElement(signature, '{%s}KeyInfo'%NAMESPACE_SIG)
x509_data = etree.SubElement(key_info, '{%s}X509Data'%NAMESPACE_SIG)
x509_certificate = etree.SubElement(x509_data, '{%s}X509Certificate'%NAMESPACE_SIG)
raiz.getroot().insert(0, signature)
# Acrescenta a tag de doctype (como o lxml nao suporta alteracao do doctype,
@ -135,21 +136,25 @@ class AssinaturaA1(Assinatura):
assinador.sign(noh_assinatura)
# Coloca na instância Signature os valores calculados
doc.Signature.DigestValue = ctxt.xpathEval(u'//sig:DigestValue')[0].content.replace(u'\n', u'')
doc.Signature.SignatureValue = ctxt.xpathEval(u'//sig:SignatureValue')[0].content.replace(u'\n', u'')
digest_value.text = ctxt.xpathEval(u'//sig:DigestValue')[0].content.replace(u'\n', u'')
signature_value.text = ctxt.xpathEval(u'//sig:SignatureValue')[0].content.replace(u'\n', u'')
# Provavelmente retornarão vários certificados, já que o xmlsec inclui a cadeia inteira
certificados = ctxt.xpathEval(u'//sig:X509Data/sig:X509Certificate')
doc.Signature.X509Certificate = certificados[len(certificados)-1].content.replace(u'\n', u'')
x509_certificate.text = certificados[len(certificados)-1].content.replace(u'\n', u'')
resultado = assinador.status == xmlsec.DSigStatusSucceeded
# Gera o XML para retornar
xml = doc_xml.serialize()
# Limpa objetos da memoria e desativa funções criptográficas
self._depois_de_assinar_ou_verificar(doc_xml, ctxt, assinador)
#print etree.tostring(raiz, pretty_print=True, xml_declaration=True, encoding='utf-8')
return resultado
if retorna_xml:
return xml
else:
return etree.parse(StringIO(xml))
def _ativar_funcoes_criptograficas(self):
# FIXME: descobrir forma de evitar o uso do libxml2 neste processo
@ -244,4 +249,3 @@ class AssinaturaA1(Assinatura):
# E, por fim, desativa todas as funções ativadas anteriormente
self._desativar_funcoes_criptograficas()

113
pynfe/processamento/comunicacao.py

@ -1,3 +1,116 @@
# -*- coding: utf-8 -*-
from httplib import HTTPSConnection, HTTPResponse
from pynfe.utils import etree, StringIO
from pynfe.utils.flags import NAMESPACE_NFE, NAMESPACE_SOAP, VERSAO_PADRAO
class Comunicacao(object):
u"""Classe abstrata responsavel por definir os metodos e logica das classes
de comunicação com os webservices da NF-e."""
servidor = None
certificado = None
certificado_senha = None
def __init__(self, servidor, certificado, certificado_senha):
self.servidor = servidor
self.certificado = certificado
self.certificado_senha = certificado_senha
class ComunicacaoSefaz(Comunicacao):
u"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
_versao = VERSAO_PADRAO
def transmitir(self, nota_fiscal):
pass
def cancelar(self, nota_fiscal):
pass
def situacao_nfe(self, nota_fiscal):
pass
def status_servico(self):
post = '/nfeweb/services/nfestatusservico.asmx'
# Monta XML do corpo da requisição # FIXME
raiz = etree.Element('teste')
dados = etree.tostring(raiz)
# Monta XML para envio da requisição
xml = self._construir_xml_soap(
metodo='CadConsultaCadastro',
tag_metodo='consultaCadastro',
cabecalho=self._cabecalho_soap(),
dados=dados,
)
# Chama método que efetua a requisição POST no servidor SOAP
retorno = self._post(post, xml, self._post_header())
# Transforma o retorno em etree
try:
retorno = etree.parse(StringIO(retorno))
return retorno
except TypeError:
pass
def consultar_cadastro(self, instancia):
#post = '/nfeweb/services/cadconsultacadastro.asmx'
post = '/nfeweb/services/nfeconsulta.asmx'
def inutilizar_faixa_numeracao(self, faixa):
pass
def _cabecalho_soap(self):
u"""Monta o XML do cabeçalho da requisição SOAP"""
raiz = etree.Element('cabecMsg', xmlns=NAMESPACE_NFE, versao="1.02")
etree.SubElement(raiz, 'versaoDados').text = self._versao
return etree.tostring(raiz, encoding='utf-8', xml_declaration=True)
def _construir_xml_soap(self, metodo, tag_metodo, cabecalho, dados):
u"""Mota o XML para o envio via SOAP"""
raiz = etree.Element('{%s}Envelope'%NAMESPACE_SOAP, nsmap={'soap': NAMESPACE_SOAP})
body = etree.SubElement(raiz, '{%s}Body'%NAMESPACE_SOAP)
met = etree.SubElement(
body, tag_metodo, xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/%s"%metodo,
)
etree.SubElement(met, 'nfeCabecMsg').text = cabecalho
etree.SubElement(met, 'nfeDadosMsg').text = dados
return etree.tostring(raiz, encoding='utf-8', xml_declaration=True)
def _post_header(self):
u"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
return {
u'content-type': u'application/soap+xml; charset=utf-8',
u'Accept': u'application/soap+xml; charset=utf-8',
}
def _post(self, post, xml, header):
# Separa arquivos de certificado para chave e certificado (sozinho)
caminho_chave, caminho_cert = self.certificado.separar_arquivo(senha=self.certificado_senha)
# Abre a conexão HTTPS
con = HTTPSConnection(self.servidor, key_file=caminho_chave, cert_file=caminho_cert)
try:
#con.set_debuglevel(100)
con.request(u'POST', post, xml, header)
resp = con.getresponse()
# Tudo certo!
if resp.status == 200:
return resp.read()
finally:
con.close()

29
pynfe/processamento/serializacao.py

@ -4,31 +4,10 @@ try:
except:
from sets import Set as set
try:
from lxml import etree
except ImportError:
try:
# Python 2.5 - cElementTree
import xml.etree.cElementTree as etree
except ImportError:
try:
# Python 2.5 - ElementTree
import xml.etree.ElementTree as etree
except ImportError:
try:
# Instalacao normal do cElementTree
import cElementTree as etree
except ImportError:
try:
# Instalacao normal do ElementTree
import elementtree.ElementTree as etree
except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree')
from pynfe.entidades import Emitente, Cliente, Produto, Transportadora, NotaFiscal
from pynfe.excecoes import NenhumObjetoEncontrado, MuitosObjetosEncontrados
from pynfe.utils import so_numeros, obter_municipio_por_codigo, obter_pais_por_codigo
from pynfe.utils.flags import CODIGOS_ESTADOS
from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, obter_pais_por_codigo
from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO
class Serializacao(object):
"""Classe abstrata responsavel por fornecer as funcionalidades basicas para
@ -64,6 +43,8 @@ class Serializacao(object):
raise Exception('Metodo nao implementado')
class SerializacaoXML(Serializacao):
_versao = VERSAO_PADRAO
def exportar(self, destino=None, retorna_string=False, **kwargs):
"""Gera o(s) arquivo(s) de Nofa Fiscal eletronica no padrao oficial da SEFAZ
e Receita Federal, para ser(em) enviado(s) para o webservice ou para ser(em)
@ -241,7 +222,7 @@ class SerializacaoXML(Serializacao):
return raiz
def _serializar_notas_fiscal(self, nota_fiscal, tag_raiz='infNFe', retorna_string=True):
raiz = etree.Element(tag_raiz, versao="2.00")
raiz = etree.Element(tag_raiz, versao=self._versao)
# Dados da Nota Fiscal
ide = etree.SubElement(raiz, 'ide')

26
pynfe/utils/__init__.py

@ -1,5 +1,31 @@
import os
try:
from lxml import etree
except ImportError:
try:
# Python 2.5 - cElementTree
import xml.etree.cElementTree as etree
except ImportError:
try:
# Python 2.5 - ElementTree
import xml.etree.ElementTree as etree
except ImportError:
try:
# Instalacao normal do cElementTree
import cElementTree as etree
except ImportError:
try:
# Instalacao normal do ElementTree
import elementtree.ElementTree as etree
except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree')
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import flags
from geraldo.utils import memoize

6
pynfe/utils/flags.py

@ -1,5 +1,11 @@
# -*- coding: utf-8 -*-
NAMESPACE_NFE = 'http://www.portalfiscal.inf.br/nfe'
NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#'
NAMESPACE_SOAP = 'http://www.w3.org/2003/05/soap-envelope'
VERSAO_PADRAO = '1.01'
TIPOS_DOCUMENTO = (
'CNPJ',
'CPF',

11
run_fake_soap_server.py

@ -1,12 +1,13 @@
from soaplib.wsgi_soap import SimpleWSGISoapApp
from soaplib.service import soapmethod
class ServidorNFEFalso(SimpleWSGISoapApp):
from soaplib.serializers.primitive import String, Integer, Array, Null
@soapmethod(String, Integer, _returns=String)
def ping(self, palavra, vezes):
return ','.join([palavra for i in range(vezes)])
class ServidorNFEFalso(SimpleWSGISoapApp):
@soapmethod(String, Integer, _returns=Array(String))
def ping(self, nome, vezes):
ret = [nome for i in range(vezes)]
print ret
return ret
if __name__ == '__main__':
porta = 8080

18
tests/01-basico.txt

@ -91,8 +91,8 @@ Os pacotes da biblioteca sao:
>>> from pynfe import processamento
>>> set([attr for attr in dir(processamento) if not attr.startswith('__')]) == set([
... 'AssinaturaA1', 'Comunicacao', 'DANFE', 'InterfaceXML', 'Validacao',
... 'assinatura', 'comunicacao', 'danfe', 'interfaces', 'validacao'])
... 'AssinaturaA1', 'ComunicacaoSefaz', 'DANFE', 'SerializacaoXML', 'Validacao',
... 'assinatura', 'comunicacao', 'danfe', 'serializacao', 'validacao'])
True
Ha ainda uma pasta dentro da pasta 'pynfe', chamada 'data', que deve
@ -113,28 +113,28 @@ Geracao e importacao de XML
As objetos Python devem ser traduzidos para arquivos XML e o caminho
inverso, de introspecao, tambem deve ser feito, atraves de uma fabrica.
>>> from pynfe.processamento.interfaces import Interface
>>> from pynfe.processamento.serializacao import Serializacao
>>> bool(Interface.exportar)
>>> bool(Serializacao.exportar)
True
>>> bool(Interface.importar)
>>> bool(Serializacao.importar)
True
A classe basica de Interface eh abstrata, ou seja, nao pode ser
A classe basica de Serializacao eh abstrata, ou seja, nao pode ser
instanciada diretamente.
>>> lista_de_nfs = []
>>> try:
... Interface(lista_de_nfs)
... Serializacao(lista_de_nfs)
... except Exception, e:
... print e.message
Esta classe nao pode ser instanciada diretamente!
Classe de interface especifica para XML, usando lxml
Classe de serializacao especifica para XML, usando lxml
>>> from pynfe.processamento.interfaces import InterfaceXML
>>> from pynfe.processamento.serializacao import SerializacaoXML
Esse procedimento sera feito por padrao atraves da lxml, devido ao seu
desempenho, mas pode ser extendido para outros formatos, dependendo do

4
tests/03-processamento-01-serializacao-xml.txt

@ -8,7 +8,7 @@ Populando fonte de dados
>>> from decimal import Decimal
>>> from pynfe.entidades import Emitente, Cliente, NotaFiscal, Transportadora
>>> from pynfe.entidades.notafiscal import NotaFiscalEntregaRetirada
>>> from pynfe.entidades.fontes_dados import _fonte_dados
>>> from pynfe.entidades.fonte_dados import _fonte_dados
>>> from pynfe.utils.flags import CODIGO_BRASIL
Popula dependentes da NF
@ -268,7 +268,7 @@ Serializando por partes
NFe52100112345678000190550010000000011518005123
>>> print serializador._serializar_notas_fiscal(nota_fiscal)
<infNFe versao="2.00" Id="NFe52100112345678000190550010000000011518005123">
<infNFe versao="1.01" Id="NFe52100112345678000190550010000000011518005123">
<ide>
<cUF>52</cUF>
<cNF>51800512</cNF>

6
tests/03-processamento-03-assinatura.txt

@ -6,7 +6,7 @@ Carregando Certificado Digital tipo A1
>>> from pynfe.entidades import CertificadoA1
>>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pem')
>>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pfx')
Assinando NF-e
--------------
@ -15,7 +15,7 @@ Assinando NF-e
Na hora de assinar, selecionar um Certificado Digital
>>> assinatura = AssinaturaA1(certificado, 'senha')
>>> assinatura = AssinaturaA1(certificado, senha='associacao')
TODO: A senha deveria ser criptografada de forma a evitar que alguem entre nesse
processo e a capture.
@ -24,7 +24,7 @@ A assinatura deve ser feita em quatro tipos diferentes de origem do XML:
- Arquivo
>>> assinatura.assinar_arquivo('tests/saida/nfe-1.xml')
>>> bool(assinatura.assinar_arquivo('tests/saida/nfe-1.xml'))
True
- String de XML

23
tests/03-processamento-04-comunicacao.txt

@ -0,0 +1,23 @@
PROCESSAMENTO - COMUNICACAO
===========================
>>> from pynfe.processamento import ComunicacaoSefaz
Carregando certificado digital tipo A1
>>> from pynfe.entidades import CertificadoA1
>>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pfx')
Instancia de comunicacao
>>> comunicacao = ComunicacaoSefaz(
... #servidor='localhost:8080',
... servidor='homologacao.nfe.fazenda.sp.gov.br',
... certificado=certificado,
... certificado_senha='associacao',
... )
Verifica o status do servico
>>> comunicacao.status_servico()

12
tests/04-servidor-soap.txt

@ -5,9 +5,11 @@ Este teste vai verificar um servidor, executado atraves do comando
'run_fake_soap_server.py', para enviar requisicoes SOAP e esperar as
responstas em formato WSDL, de forma a simular o servidor da SEFAZ.
>>> from suds.client import Client
>>> url = 'http://localhost:8080/ServidorNFEFalso?wsdl'
>>> #client = Client(url)
>>> #print client
>>> #print client.service.ping('mario', 5)
Usando suds
>>> #from suds.client import Client
>>> #client = Client('http://localhost:8080/ServidorNFEFalso?wsdl', cache=None)
>>> #client.set_options(retxml=True)
>>> #bool(client.service.ping('mario', 5))
True

BIN
tests/certificado.pfx

Loading…
Cancel
Save