Browse Source

Iniciando consulta cadastr

tags/0.1.5
Danimar Ribeiro 11 years ago
parent
commit
19c9bc5c0a
  1. 53
      pytrustnfe/servicos/Comunicacao.py
  2. 14
      pytrustnfe/servicos/NfeConsultaCadastro.py
  3. 27
      pytrustnfe/test/test_comunicacao.py
  4. 33
      pytrustnfe/test/test_consulta_cadastro.py
  5. 15
      pytrustnfe/utils.py

53
pytrustnfe/servicos/Comunicacao.py

@ -7,20 +7,36 @@ Created on Jun 14, 2015
from lxml import objectify from lxml import objectify
from uuid import uuid4 from uuid import uuid4
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import tostring
from pytrustnfe.HttpClient import HttpClient from pytrustnfe.HttpClient import HttpClient
from pytrustnfe.Certificado import converte_pfx_pem from pytrustnfe.Certificado import converte_pfx_pem
from pytrustnfe.Strings import CONSULTA_CADASTRO_COMPLETA from pytrustnfe.Strings import CONSULTA_CADASTRO_COMPLETA
class Comunicacao(object): class Comunicacao(object):
url = '' url = ''
web_service = ''
web_service = ''
metodo = ''
tag_retorno = ''
def __init__(self, certificado, senha): def __init__(self, certificado, senha):
self.certificado = certificado self.certificado = certificado
self.senha = senha self.senha = senha
def _soap_xml(self, body):
return '<?xml version="1.0" encoding="utf-8"?>'\
'<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">'\
'<soap:Header>'\
'<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/' + self.tag_retorno + '">'\
'<cUF>42</cUF><versaoDados>2.00</versaoDados>'\
'</nfeCabecMsg>'\
'</soap:Header>'\
'<soap:Body>'\
'<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/' + self.tag_retorno + '">'\
+ body + '</nfeDadosMsg>'\
'</soap:Body>'\
'</soap:Envelope>'
def _preparar_temp_pem(self): def _preparar_temp_pem(self):
chave_temp = '/tmp/' + uuid4().hex chave_temp = '/tmp/' + uuid4().hex
@ -42,36 +58,21 @@ class Comunicacao(object):
assert self.web_service != '', "Web service não especificado" assert self.web_service != '', "Web service não especificado"
assert self.certificado != '', "Certificado não configurado" assert self.certificado != '', "Certificado não configurado"
assert self.senha != '', "Senha não configurada" assert self.senha != '', "Senha não configurada"
assert self.metodo != '', "Método não configurado"
assert self.tag_retorno != '', "Tag de retorno não configurado"
def _executar_consulta(self, xmlEnviar): def _executar_consulta(self, xmlEnviar):
self._validar_dados() self._validar_dados()
chave, certificado = self._preparar_temp_pem() chave, certificado = self._preparar_temp_pem()
client = HttpClient(self.url, chave, certificado) client = HttpClient(self.url, chave, certificado)
xml_retorno = client.post_xml(self.web_service, xmlEnviar)
obj = objectify.fromstring(xml_retorno)
soap_xml = self._soap_xml(xmlEnviar)
xml_retorno = client.post_xml(self.web_service, soap_xml)
tree = ET.fromstring(xml_retorno)
node = tree.find(self.tag_retorno)
node = tostring(node)
obj = objectify.fromstring(node)
return xml_retorno, obj return xml_retorno, obj
def consulta_cadastro(self, obj_consulta):
chave, certificado = self._preparar_temp_pem()
client = HttpClient('nfe.fazenda.sp.gov.br', chave, certificado)
xml_retorno = client.post_xml('/ws/cadconsultacadastro2.asmx', CONSULTA_CADASTRO_COMPLETA)
obj = objectify.fromstring(xml_retorno)
return xml_retorno, obj
def envio_nfe(self):
chave, certificado = self._preparar_temp_pem()
c = HttpClient('cad.sefazrs.rs.gov.br', chave, certificado)
xml_retorno = c.post_xml('/ws/cadconsultacadastro/cadconsultacadastro2.asmx', '')
obj = objectify.fromstring(xml_retorno)
return xml_retorno, obj

14
pytrustnfe/servicos/NfeConsultaCadastro.py

@ -5,18 +5,26 @@ Created on 21/06/2015
@author: danimar @author: danimar
''' '''
from pytrustnfe.servicos.Comunicacao import Comunicacao from pytrustnfe.servicos.Comunicacao import Comunicacao
from pytrustnfe.xml import DynamicXml
from pytrustnfe.xml.DynamicXml import DynamicXml
class NfeConsultaCadastro(Comunicacao): class NfeConsultaCadastro(Comunicacao):
def consultar_cadastro(self, cadastro):
def __init__(self, certificado, senha):
super(NfeConsultaCadastro, self).__init__(certificado, senha)
self.metodo = 'CadConsultaCadastro2'
self.tag_retorno = 'consultaCadastro2Result'
def consultar_cadastro(self, cadastro, estado):
xml = None xml = None
if isinstance(cadastro, DynamicXml): if isinstance(cadastro, DynamicXml):
xml = cadastro.render() xml = cadastro.render()
if isinstance(cadastro, basestring): if isinstance(cadastro, basestring):
xml = cadastro xml = cadastro
assert xml is not None, "Objeto cadastro deve ser do tipo DynamicXml ou string" assert xml is not None, "Objeto cadastro deve ser do tipo DynamicXml ou string"
self.web_service = '/ws/cadconsultacadastro/cadconsultacadastro2.asmx'
self.url = 'cad.svrs.rs.gov.br'
return self._executar_consulta(xml) return self._executar_consulta(xml)

27
pytrustnfe/test/test_comunicacao.py

@ -16,31 +16,6 @@ class test_comunicacao(unittest.TestCase):
caminho = os.path.dirname(__file__) caminho = os.path.dirname(__file__)
def test_consulta_cadastro(self):
try:
dir_pfx = '/home/danimar/Desktop/isotelha.pfx' #Hack
com = Comunicacao(dir_pfx, 'iso@#telha')
xml, objeto = com.consulta_cadastro(None)
print xml
print objeto
except Exception as e:
print(str(e))
#Teste temporario
def test_envio_without_mock(self):
try:
dir_pfx = '/home/danimar/Desktop/isotelha.pfx' #Hack
com = Comunicacao(dir_pfx, 'iso@#telha')
xml, objeto = com.envio_nfe()
print xml
print objeto
except Exception as e:
print(str(e))
def test_envio_nfe(self): def test_envio_nfe(self):
dir_pfx = os.path.join(self.caminho, 'teste.pfx') dir_pfx = os.path.join(self.caminho, 'teste.pfx')
@ -54,7 +29,7 @@ class test_comunicacao(unittest.TestCase):
conn.getresponse.return_value = retorno conn.getresponse.return_value = retorno
com = Comunicacao(dir_pfx, '123456') com = Comunicacao(dir_pfx, '123456')
xml, objeto = com.envio_nfe()
xml, objeto = com._executar_consulta('')
self.assertEqual(xml, XML_RETORNO, 'Envio de NF-e com problemas - xml de retorno inválido') self.assertEqual(xml, XML_RETORNO, 'Envio de NF-e com problemas - xml de retorno inválido')
self.assertEqual(objeto.cUF, 42, 'Envio de NF-e com problemas - objeto de retorno inválido') self.assertEqual(objeto.cUF, 42, 'Envio de NF-e com problemas - objeto de retorno inválido')

33
pytrustnfe/test/test_consulta_cadastro.py

@ -0,0 +1,33 @@
#coding=utf-8
'''
Created on 22/06/2015
@author: danimar
'''
import unittest
from pytrustnfe.servicos.NfeConsultaCadastro import NfeConsultaCadastro
from pytrustnfe.xml.DynamicXml import DynamicXml
class Test(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
c = DynamicXml('ConsCad')
c(xmlns="http://www.portalfiscal.inf.br/nfe", versao="2.00")
c.infCons.xServ = 'CONS-CAD'
c.infCons.UF = 'SC'
c.infCons.CNPJ = '82951310000156'
self.objeto_consulta = c
def test_consulta_cadastro(self):
try:
dir_pfx = '/home/danimar/projetos/isotelha.pfx' #Hack
com = NfeConsultaCadastro(dir_pfx, 'iso@#telha')
xml, objeto = com.consultar_cadastro(self.objeto_consulta, 'SC')
print xml
print objeto
except Exception as e:
print(str(e))

15
pytrustnfe/utils.py

@ -0,0 +1,15 @@
# coding=utf-8
'''
Created on 22/06/2015
@author: danimar
'''
from datetime import date, datetime
def date_tostring(data):
assert isinstance(data, date), "Objeto date requerido"
return data.strftime("%d-%m-%y")
def datetime_tostring(data):
assert isinstance(data, datetime), "Objeto datetime requerido"
return data.strftime("%d-%m-%y %H:%M:%S")
Loading…
Cancel
Save