From 8022a445a865d214e261f8301c87e0d622c07765 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marinho=20Brand=C3=A3o?= Date: Mon, 18 Jan 2010 19:09:47 -0200 Subject: [PATCH] =?UTF-8?q?Trabalhando=20no=20modulo=20de=20comunica=C3=A7?= =?UTF-8?q?=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pynfe/entidades/certificado.py | 35 +++++++++++- pynfe/processamento/assinatura.py | 36 ++---------- pynfe/processamento/comunicacao.py | 92 +++++++++++++++++++++++++++++-- pynfe/processamento/serializacao.py | 23 +------- pynfe/utils/__init__.py | 26 +++++++++ pynfe/utils/flags.py | 4 ++ tests/03-processamento-03-assinatura.txt | 2 +- tests/03-processamento-04-comunicacao.txt | 18 ++++++ tests/04-servidor-soap.txt | 17 ++---- 9 files changed, 180 insertions(+), 73 deletions(-) diff --git a/pynfe/entidades/certificado.py b/pynfe/entidades/certificado.py index 9c34a5c..60be860 100644 --- a/pynfe/entidades/certificado.py +++ b/pynfe/entidades/certificado.py @@ -1,6 +1,10 @@ -# -*- coding; utf-8 -*- +# -*- coding: utf-8 -*- +import os + from base import Entidade +from OpenSSL import crypto + class Certificado(Entidade): u"""Classe abstrata responsavel por definir o modelo padrao para as demais classes de certificados digitais. @@ -15,13 +19,40 @@ class Certificado(Entidade): return super(Certificado, cls).__new__(cls, *args, **kwargs) class CertificadoA1(Certificado): - """Implementa a entidade do certificado eCNPJ A1, suportado pelo OpenSSL, + u"""Implementa a entidade do certificado eCNPJ A1, suportado pelo OpenSSL, e amplamente utilizado.""" caminho_arquivo = None conteudo_x509 = None + pasta_temporaria = '/tmp/' + arquivo_chave = 'key.pem' + arquivo_cert = 'cert.pem' def __init__(self, caminho_arquivo=None, conteudo_x509=None): self.caminho_arquivo = caminho_arquivo or self.caminho_arquivo self.conteudo_x509 = conteudo_x509 or self.conteudo_x509 + + def separar_arquivo(self, senha, caminho_chave=None, caminho_cert=None): + u"""Separa o arquivo de certificado em dois: de chave e de certificado, + em arquivos temporários separados""" + + caminho_chave = caminho_chave or os.path.join(self.pasta_temporaria, self.arquivo_chave) + caminho_cert = caminho_cert or os.path.join(self.pasta_temporaria, self.arquivo_cert) + + # Lendo o arquivo pfx no formato pkcs12 como binario + pkcs12 = crypto.load_pkcs12(file(self.caminho_arquivo, 'rb').read(), senha) + + # Retorna a string decodificado da chave privada + key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkcs12.get_privatekey()) + + # Retorna a string decodificado do certificado + cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, pkcs12.get_certificate()) + + # Gravando a string no dicso + file(caminho_cert, 'wb').write(cert_str) + + # Gravando a string no dicso + file(caminho_chave, 'wb').write(key_str) + + return caminho_chave, caminho_cert diff --git a/pynfe/processamento/assinatura.py b/pynfe/processamento/assinatura.py index 5b5c0b9..27bb5ad 100644 --- a/pynfe/processamento/assinatura.py +++ b/pynfe/processamento/assinatura.py @@ -1,37 +1,11 @@ # -*- coding: utf-8 -*- -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO - -try: - from lxml import etree -except ImportError: - try: - # Python 2.5 - cElementTree - import xml.etree.cElementTree as etree - except ImportError: - try: - # Python 2.5 - ElementTree - import xml.etree.ElementTree as etree - except ImportError: - try: - # Instalacao normal do cElementTree - import cElementTree as etree - except ImportError: - try: - # Instalacao normal do ElementTree - import elementtree.ElementTree as etree - except ImportError: - raise Exception('Falhou ao importar lxml/ElementTree') - import xmlsec, libxml2 # FIXME: verificar ambiguidade de dependencias: lxml e libxml2 from geraldo.utils import memoize -NAMESPACE_NFE = u'http://www.portalfiscal.inf.br/nfe' -NAMESPACE_SIG = u'http://www.w3.org/2000/09/xmldsig#' +from pynfe.utils import etree, StringIO +from pynfe.utils.flags import NAMESPACE_NFE, NAMESPACE_SIG class Assinatura(object): """Classe abstrata responsavel por definir os metodos e logica das classes @@ -113,11 +87,10 @@ class AssinaturaA1(Assinatura): # Tag de assinatura if raiz.getroot().find('Signature') is None: signature = etree.Element( - 'Signature', + '{%s}Signature'%NAMESPACE_SIG, URI=raiz.getroot().getchildren()[0].attrib['Id'], - xmlns=NAMESPACE_SIG, + nsmap={'sig': NAMESPACE_SIG}, ) - signature.text = '' raiz.getroot().insert(0, signature) # Acrescenta a tag de doctype (como o lxml nao suporta alteracao do doctype, @@ -202,6 +175,7 @@ class AssinaturaA1(Assinatura): return resultado def _antes_de_assinar_ou_verificar(self, raiz): + raise Exception(dir(raiz)) # Converte etree para string xml = etree.tostring(raiz, xml_declaration=True, encoding='utf-8') diff --git a/pynfe/processamento/comunicacao.py b/pynfe/processamento/comunicacao.py index 031be29..27bfbec 100644 --- a/pynfe/processamento/comunicacao.py +++ b/pynfe/processamento/comunicacao.py @@ -1,19 +1,28 @@ # -*- coding: utf-8 -*- +from httplib import HTTPSConnection, HTTPResponse + +from pynfe.utils import etree, StringIO +from pynfe.utils.flags import NAMESPACE_NFE, NAMESPACE_SOAP + class Comunicacao(object): u"""Classe abstrata responsavel por definir os metodos e logica das classes de comunicação com os webservices da NF-e.""" servidor = None - porta = None + certificado = None + certificado_senha = None - def __init__(self, servidor, porta): + def __init__(self, servidor, certificado, certificado_senha): self.servidor = servidor - self.porta = porta + self.certificado = certificado + self.certificado_senha = certificado_senha class ComunicacaoSefaz(Comunicacao): u"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados.""" + _versao = '1.01' + def transmitir(self, nota_fiscal): pass @@ -24,11 +33,84 @@ class ComunicacaoSefaz(Comunicacao): pass def status_servico(self): - pass + post = '/nfeweb/services/nfestatusservico.asmx' + + # Monta XML do corpo da requisição # FIXME + raiz = etree.Element('teste') + dados = etree.tostring(raiz) + + # Monta XML para envio da requisição + xml = self._construir_xml_soap( + metodo='CadConsultaCadastro', + tag_metodo='consultaCadastro', + cabecalho=self._cabecalho_soap(), + dados=dados, + ) + + # Chama método que efetua a requisição POST no servidor SOAP + retorno = self._post(post, xml, self._post_header()) + + # Transforma o retorno em etree + try: + retorno = etree.parse(StringIO(retorno)) + return retorno + except TypeError: + pass def consultar_cadastro(self, instancia): - pass + #post = '/nfeweb/services/cadconsultacadastro.asmx' + post = '/nfeweb/services/nfeconsulta.asmx' def inutilizar_faixa_numeracao(self, faixa): pass + def _cabecalho_soap(self): + u"""Monta o XML do cabeçalho da requisição SOAP""" + + raiz = etree.Element('cabecMsg', xmlns=NAMESPACE_NFE, versao="1.02") + etree.SubElement(raiz, 'versaoDados').text = self._versao + + return etree.tostring(raiz, encoding='utf-8', xml_declaration=True) + + def _construir_xml_soap(self, metodo, tag_metodo, cabecalho, dados): + u"""Mota o XML para o envio via SOAP""" + + raiz = etree.Element('{%s}Envelope'%NAMESPACE_SOAP, nsmap={'soap': NAMESPACE_SOAP}) + + body = etree.SubElement(raiz, '{%s}Body'%NAMESPACE_SOAP) + met = etree.SubElement( + body, tag_metodo, xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/%s"%metodo, + ) + + etree.SubElement(met, 'nfeCabecMsg').text = cabecalho + etree.SubElement(met, 'nfeDadosMsg').text = dados + + return etree.tostring(raiz, encoding='utf-8', xml_declaration=True) + + def _post_header(self): + u"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" + return { + u'content-type': u'application/soap+xml; charset=utf-8', + u'Accept': u'application/soap+xml; charset=utf-8', + } + + def _post(self, post, xml, header): + # Separa arquivos de certificado para chave e certificado (sozinho) + caminho_chave, caminho_cert = self.certificado.separar_arquivo(senha=self.certificado_senha) + + # Abre a conexão HTTPS + con = HTTPSConnection(self.servidor, key_file=caminho_chave, cert_file=caminho_cert) + + try: + #con.set_debuglevel(100) + + con.request(u'POST', post, xml, header) + + resp = con.getresponse() + + # Tudo certo! + if resp.status == 200: + return resp.read() + finally: + con.close() + diff --git a/pynfe/processamento/serializacao.py b/pynfe/processamento/serializacao.py index 22040c3..f519d4d 100644 --- a/pynfe/processamento/serializacao.py +++ b/pynfe/processamento/serializacao.py @@ -4,30 +4,9 @@ try: except: from sets import Set as set -try: - from lxml import etree -except ImportError: - try: - # Python 2.5 - cElementTree - import xml.etree.cElementTree as etree - except ImportError: - try: - # Python 2.5 - ElementTree - import xml.etree.ElementTree as etree - except ImportError: - try: - # Instalacao normal do cElementTree - import cElementTree as etree - except ImportError: - try: - # Instalacao normal do ElementTree - import elementtree.ElementTree as etree - except ImportError: - raise Exception('Falhou ao importar lxml/ElementTree') - from pynfe.entidades import Emitente, Cliente, Produto, Transportadora, NotaFiscal from pynfe.excecoes import NenhumObjetoEncontrado, MuitosObjetosEncontrados -from pynfe.utils import so_numeros, obter_municipio_por_codigo, obter_pais_por_codigo +from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, obter_pais_por_codigo from pynfe.utils.flags import CODIGOS_ESTADOS class Serializacao(object): diff --git a/pynfe/utils/__init__.py b/pynfe/utils/__init__.py index ef67cbd..b11ab9f 100644 --- a/pynfe/utils/__init__.py +++ b/pynfe/utils/__init__.py @@ -1,5 +1,31 @@ import os +try: + from lxml import etree +except ImportError: + try: + # Python 2.5 - cElementTree + import xml.etree.cElementTree as etree + except ImportError: + try: + # Python 2.5 - ElementTree + import xml.etree.ElementTree as etree + except ImportError: + try: + # Instalacao normal do cElementTree + import cElementTree as etree + except ImportError: + try: + # Instalacao normal do ElementTree + import elementtree.ElementTree as etree + except ImportError: + raise Exception('Falhou ao importar lxml/ElementTree') + +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + import flags from geraldo.utils import memoize diff --git a/pynfe/utils/flags.py b/pynfe/utils/flags.py index f7054ba..77b2acb 100644 --- a/pynfe/utils/flags.py +++ b/pynfe/utils/flags.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +NAMESPACE_NFE = 'http://www.portalfiscal.inf.br/nfe' +NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#' +NAMESPACE_SOAP = 'http://www.w3.org/2003/05/soap-envelope' + TIPOS_DOCUMENTO = ( 'CNPJ', 'CPF', diff --git a/tests/03-processamento-03-assinatura.txt b/tests/03-processamento-03-assinatura.txt index 407288a..b564a24 100644 --- a/tests/03-processamento-03-assinatura.txt +++ b/tests/03-processamento-03-assinatura.txt @@ -6,7 +6,7 @@ Carregando Certificado Digital tipo A1 >>> from pynfe.entidades import CertificadoA1 - >>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pem') + >>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pfx') Assinando NF-e -------------- diff --git a/tests/03-processamento-04-comunicacao.txt b/tests/03-processamento-04-comunicacao.txt index 864a922..55c781d 100644 --- a/tests/03-processamento-04-comunicacao.txt +++ b/tests/03-processamento-04-comunicacao.txt @@ -3,3 +3,21 @@ PROCESSAMENTO - COMUNICACAO >>> from pynfe.processamento import ComunicacaoSefaz +Carregando certificado digital tipo A1 + + >>> from pynfe.entidades import CertificadoA1 + >>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pfx') + +Instancia de comunicacao + + >>> comunicacao = ComunicacaoSefaz( + ... #servidor='localhost:8080', + ... servidor='homologacao.nfe.fazenda.sp.gov.br', + ... certificado=certificado, + ... certificado_senha='associacao', + ... ) + +Verifica o status do servico + + >>> comunicacao.status_servico() + diff --git a/tests/04-servidor-soap.txt b/tests/04-servidor-soap.txt index d7d4cfd..6a202b3 100644 --- a/tests/04-servidor-soap.txt +++ b/tests/04-servidor-soap.txt @@ -7,16 +7,9 @@ responstas em formato WSDL, de forma a simular o servidor da SEFAZ. Usando suds - >>> from suds.client import Client - >>> client = Client('http://localhost:8080/ServidorNFEFalso?wsdl', cache=None) - >>> client.set_options(retxml=True) - >>> print client.service.ping('mario', 5) - -Usando soaplib - - >>> #from run_fake_soap_server import ServidorNFEFalso - >>> #from soaplib.client import ServiceClient, make_service_client - >>> #client = make_service_client('http://localhost:8080/', ServidorNFEFalso()) - >>> #print client.ping('Brasil', 5) - ['Brasil', 'Brasil', 'Brasil', 'Brasil', 'Brasil'] + >>> #from suds.client import Client + >>> #client = Client('http://localhost:8080/ServidorNFEFalso?wsdl', cache=None) + >>> #client.set_options(retxml=True) + >>> #bool(client.service.ping('mario', 5)) + True