Browse Source

Trabalhando na comunicação

tags/0.1
Marinho Brandão 16 years ago
parent
commit
cd47ed7a31
  1. 9
      pynfe/processamento/assinatura.py
  2. 21
      pynfe/processamento/comunicacao.py
  3. 6
      pynfe/utils/__init__.py
  4. 49
      run_fake_soap_server.py
  5. 7
      tests/03-processamento-04-comunicacao.txt

9
pynfe/processamento/assinatura.py

@ -2,9 +2,7 @@
import xmlsec, libxml2 # FIXME: verificar ambiguidade de dependencias: lxml e libxml2
from geraldo.utils import memoize
from pynfe.utils import etree, StringIO
from pynfe.utils import etree, StringIO, extrair_tag
from pynfe.utils.flags import NAMESPACE_NFE, NAMESPACE_SIG
class Assinatura(object):
@ -52,11 +50,6 @@ class Assinatura(object):
def verificar_objetos(self, objetos):
pass
@memoize
def extrair_tag(root):
return root.tag.split('}')[-1]
class AssinaturaA1(Assinatura):
"""Classe abstrata responsavel por efetuar a assinatura do certificado
digital no XML informado."""

21
pynfe/processamento/comunicacao.py

@ -5,6 +5,7 @@ from httplib import HTTPSConnection, HTTPResponse
from pynfe.utils import etree, StringIO, so_numeros
from pynfe.utils.flags import NAMESPACE_NFE, NAMESPACE_SOAP, VERSAO_PADRAO
from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO
from assinatura import AssinaturaA1
class Comunicacao(object):
u"""Classe abstrata responsavel por definir os metodos e logica das classes
@ -27,6 +28,7 @@ class ComunicacaoSefaz(Comunicacao):
u"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
_versao = VERSAO_PADRAO
_assinatura = AssinaturaA1
def transmitir(self, nota_fiscal):
pass
@ -46,8 +48,8 @@ class ComunicacaoSefaz(Comunicacao):
# Monta XML para envio da requisição
xml = self._construir_xml_soap(
metodo='CadConsultaCadastro', # FIXME
tag_metodo='consultaCadastro', # FIXME
metodo='nfeRecepcao2', # FIXME
tag_metodo='nfeStatusServicoNF2', # FIXME
cabecalho=self._cabecalho_soap(),
dados=dados,
)
@ -64,8 +66,8 @@ class ComunicacaoSefaz(Comunicacao):
#post = '/nfeweb/services/cadconsultacadastro.asmx'
post = '/nfeweb/services/nfeconsulta.asmx'
def inutilizar_faixa_numeracao(self, numero_inicial, numero_final, emitente, ano=None,
serie='1', justificativa=''):
def inutilizar_faixa_numeracao(self, numero_inicial, numero_final, emitente, certificado,
senha, ano=None, serie='1', justificativa=''):
post = '/nfeweb/services/nfestatusservico.asmx'
# Valores default
@ -87,7 +89,7 @@ class ComunicacaoSefaz(Comunicacao):
# Monta XML do corpo da requisição # FIXME
raiz = etree.Element('inutNFe', xmlns="http://www.portalfiscal.inf.br/nfe", versao="1.07")
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico) # FIXME
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
etree.SubElement(inf_inut, 'cUF').text = uf
@ -98,13 +100,16 @@ class ComunicacaoSefaz(Comunicacao):
etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
etree.SubElement(inf_inut, 'xJust').text = justificativa
dados = etree.tostring(raiz, encoding='utf-8', xml_declaration=True)
#dados = etree.tostring(raiz, encoding='utf-8', xml_declaration=True)
# Efetua assinatura
assinatura = self._assinatura(certificado, senha)
dados = assinatura.assinar_etree(etree.ElementTree(raiz), retorna_xml=True)
# Monta XML para envio da requisição
xml = self._construir_xml_soap(
metodo='CadConsultaCadastro', # FIXME
tag_metodo='consultaCadastro', # FIXME
metodo='nfeRecepcao2', # XXX
tag_metodo='nfeInutilizacaoNF', # XXX
cabecalho=self._cabecalho_soap(),
dados=dados,
)

6
pynfe/utils/__init__.py

@ -64,4 +64,8 @@ def obter_municipio_por_codigo(codigo, uf):
municipios = carregar_arquivo_municipios(uf)
return municipios[codigo]
@memoize
def extrair_tag(root):
return root.tag.split('}')[-1]

49
run_fake_soap_server.py

@ -2,7 +2,7 @@
"""Este script deve ser executado com Python 2.6+ e OpenSSL"""
import os
import os, datetime
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
@ -16,6 +16,9 @@ import tornado.ioloop
import tornado.web
import tornado.options
from pynfe.utils import etree, StringIO, extrair_tag
from pynfe.utils.flags import CODIGOS_ESTADOS
#class ServidorNFEFalso(SimpleWSGISoapApp):
# @soapmethod(String, Integer, _returns=Array(String))
# def ping(self, nome, vezes):
@ -23,8 +26,50 @@ import tornado.options
# return ret
class HandlerStatusServico(tornado.web.RequestHandler):
sigla_servidor = 'GO'
def post(self):
self.write('<x/>')
# Obtem o body da request
xml = self.request.body
# Transforma em etree
raiz = etree.parse(StringIO(xml))
# Extrai a tag do método da request
tag = extrair_tag(raiz.getroot().getchildren()[0].getchildren()[0])
# Chama o método respectivo para a tag
print 'Metodo:', tag
getattr(self, tag)(raiz)
def nfeStatusServicoNF2(self, raiz):
data_hora = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
ret = etree.Element('retConsStatServ')
etree.SubElement(ret, 'versao').text = '1.00' # FIXME
etree.SubElement(ret, 'tbAmb').text = '2' # Homologação
etree.SubElement(ret, 'verAplic').text = self.sigla_servidor
etree.SubElement(ret, 'cStat').text = '1' # FIXME
etree.SubElement(ret, 'xMotivo').text = 'Servico em funcionamento normal' # FIXME
etree.SubElement(ret, 'cUF').text = CODIGOS_ESTADOS[self.sigla_servidor]
etree.SubElement(ret, 'dhRecbto').text = data_hora
etree.SubElement(ret, 'tMed').text = '10'
etree.SubElement(ret, 'dhRetorno').text = data_hora
etree.SubElement(ret, 'xObs').text = 'Nenhuma informacao adicional'
xml = etree.tostring(ret, encoding='utf-8', xml_declaration=True)
self.write(xml)
def nfeInutilizacaoNF(self, raiz):
data_hora = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
ret = etree.Element('retInutNFe')
etree.SubElement(ret, 'versao').text = '1.00' # FIXME
xml_dados = raiz.getroot().getchildren()[0].getchildren()[0].getchildren()[1].text
xml = etree.tostring(ret, encoding='utf-8', xml_declaration=True)
self.write(xml)
if __name__ == '__main__':
porta = 8080

7
tests/03-processamento-04-comunicacao.txt

@ -36,6 +36,11 @@ Instancia do emitente (auxiliar para este teste)
... endereco_telefone='6242421212',
... )
Instancia do certificado
>>> from pynfe.entidades import CertificadoA1
>>> certificado = CertificadoA1(caminho_arquivo='tests/certificado.pfx')
Verifica o status do servico
>>> comunicacao.status_servico()
@ -60,6 +65,8 @@ Consulta de cadastro (???)
Inulitilizacao de faixa de numeracao
>>> comunicacao.inutilizar_faixa_numeracao(
... certificado=certificado,
... senha='associacao',
... numero_inicial=10,
... numero_final=20,
... emitente=emitente,

Loading…
Cancel
Save