diff --git a/pynfe/processamento/comunicacao.py b/pynfe/processamento/comunicacao.py
index fbeb511..73f8d41 100644
--- a/pynfe/processamento/comunicacao.py
+++ b/pynfe/processamento/comunicacao.py
@@ -1,20 +1,16 @@
# -*- coding: utf-8 -*-
+
+from __future__ import division, print_function, unicode_literals
+
import re
-import ssl
-import datetime
import requests
-from pynfe.utils import etree, so_numeros
+from pynfe.utils import etree, StringIO
from pynfe.utils.flags import (
- NAMESPACE_NFE,
NAMESPACE_XSD,
NAMESPACE_XSI,
- VERSAO_PADRAO,
NAMESPACE_SOAP,
CODIGOS_ESTADOS,
- NAMESPACE_BETHA,
- NAMESPACE_METODO
)
-from pynfe.utils.webservices import NFE, NFCE, NFSE
from pynfe.entidades.certificado import CertificadoA1
from .assinatura import AssinaturaA1
@@ -41,451 +37,99 @@ class Comunicacao(object):
class ComunicacaoSefaz(Comunicacao):
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
- _versao = VERSAO_PADRAO
+ _versao = False
_assinatura = AssinaturaA1
+ _namespace = False
+ _header = False
+ _envio_mensagem = False
+ _namespace_metodo = False
+ _accept = False
+ _soap_action = False
+ _ws_url = False
+ _namespace_soap = NAMESPACE_SOAP
+ _namespace_xsi = NAMESPACE_XSI
+ _namespace_xsd = NAMESPACE_XSD
+ _soap_version = 'soap'
+
+ def _cabecalho_soap(self, metodo):
+ """Monta o XML do cabeçalho da requisição SOAP"""
+
+ raiz = etree.Element(
+ self._header,
+ xmlns=self._namespace_metodo + metodo
+ )
+ etree.SubElement(raiz, 'versaoDados').text = '3.00'
+ # MDFE_WS_METODO[metodo]['versao']
- def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
- """
- Método para realizar autorização da nota de acordo com o modelo
- :param modelo: Modelo
- :param nota_fiscal: XML assinado
- :param id_lote: Id do lote - numero autoincremental gerado pelo sistema
- :param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
- :return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
- envia todo o soap de resposta da Sefaz para decisão do usuário.
- """
- # url do serviço
- url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
- etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
- etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
- raiz.append(nota_fiscal)
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
- # Faz request no Servidor da Sefaz
- retorno = self._post(url, xml)
-
- # Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
- # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
- # import pdb
- # pdb.set_trace()
- if retorno.status_code == 200:
- # namespace
- ns = {'ns': 'http://www.portalfiscal.inf.br/nfe'}
- if ind_sinc == 1:
- # Procuta status no xml
- try:
- prot = etree.fromstring(retorno.text)
- except ValueError:
- # em SP retorno.text apresenta erro
- prot = etree.fromstring(retorno.content)
- try:
- # Protocolo com envio OK
- inf_prot = prot[0][0] # root protNFe
- lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
- # Lote processado
- if lote_status == '104':
- prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
- status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
- # autorizado usa da NF-e
- # retorna xml final (NFe+protNFe)
- if status == '100':
- raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
- raiz.append(nota_fiscal)
- raiz.append(prot_nfe)
- return 0, raiz
- except IndexError:
- # Protocolo com algum erro no Envio
- print(retorno.text)
- else:
- # Retorna id do protocolo para posterior consulta em caso de sucesso.
- try:
- rec = etree.fromstring(retorno.text)
- except ValueError:
- # em SP retorno.text apresenta erro
- rec = etree.fromstring(retorno.content)
- rec = rec[0][0]
- status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
- # Lote Recebido com Sucesso!
- if status == '103':
- nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
- return 0, nrec, nota_fiscal
- return 1, retorno, nota_fiscal
-
- def consulta_recibo(self, modelo, numero):
- """
- Este método oferece a consulta do resultado do processamento de um lote de NF-e.
- O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
- 15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
- deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
- Processamento".
- :param modelo: Modelo da nota
- :param numero: Número da nota
- :return:
- """
-
- # url do serviço
- url = self._get_url(modelo=modelo, consulta='RECIBO')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'nRec').text = numero
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
- return self._post(url, xml)
-
- def consulta_nota(self, modelo, chave):
- """
- Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
- da Secretaria de Fazenda Estadual.
- :param modelo: Modelo da nota
- :param chave: Chave da nota
- :return:
- """
- # url do serviço
- url = self._get_url(modelo=modelo, consulta='CHAVE')
- # Monta XML do corpo da requisição
- raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
- etree.SubElement(raiz, 'chNFe').text = chave
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
- return self._post(url, xml)
-
- def consulta_notas_cnpj(self, cnpj, nsu=0):
- """
- “Serviço de Consulta da Relação de Documentos Destinados” para um determinado CNPJ de
- destinatário informado na NF-e.
- :param cnpj: CNPJ
- :param nsu: NSU
- :return:
- """
-
- # url do serviço
- url = self._get_url_an(consulta='DESTINADAS')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('consNFeDest', versao='1.01', xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NFE DEST'
- etree.SubElement(raiz, 'CNPJ').text = cnpj
-
- # Indicador de NF-e consultada:
- # 0 = Todas as NF-e;
- # 1 = Somente as NF-e que ainda não tiveram manifestação do destinatário (Desconhecimento da
- # operação, Operação não Realizada ou Confirmação da Operação);
- # 2 = Idem anterior, incluindo as NF-e que também não tiveram a Ciência da Operação.
- etree.SubElement(raiz, 'indNFe').text = '0'
-
- # Indicador do Emissor da NF-e:
- # 0 = Todos os Emitentes / Remetentes;
- # 1 = Somente as NF-e emitidas por emissores / remetentes que não tenham o mesmo CNPJ-Base do
- # destinatário (para excluir as notas fiscais de transferência entre filiais).
- etree.SubElement(raiz, 'indEmi').text = '0'
-
- # Último NSU recebido pela Empresa. Caso seja informado com zero, ou com um NSU muito antigo, a consulta
- # retornará unicamente as notas fiscais que tenham sido recepcionadas nos últimos 15 dias.
- etree.SubElement(raiz, 'ultNSU').text = str(nsu)
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NfeConsultaDest', raiz)
- return self._post(url, xml)
-
- def consulta_distribuicao(self, cnpj, nsu=0):
- pass
-
- def consulta_cadastro(self, modelo, cnpj):
- """
- Consulta de cadastro
- :param modelo: Modelo da nota
- :param cnpj: CNPJ da empresa
- :return:
- """
- # UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
- lista_svrs = ['AC', 'RN', 'PB', 'SC', 'PI']
-
- # RS implementa um método diferente na consulta de cadastro
- if self.uf.upper() == 'RS':
- url = NFE['RS']['CADASTRO']
- elif self.uf.upper() in lista_svrs:
- url = NFE['SVRS']['CADASTRO']
- elif self.uf.upper() == 'SVC-RS':
- url = NFE['SVC-RS']['CADASTRO']
- else:
- url = self._get_url(modelo=modelo, consulta='CADASTRO')
-
- raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
- info = etree.SubElement(raiz, 'infCons')
- etree.SubElement(info, 'xServ').text = 'CONS-CAD'
- etree.SubElement(info, 'UF').text = self.uf.upper()
- etree.SubElement(info, 'CNPJ').text = cnpj
- # etree.SubElement(info, 'CPF').text = cpf
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
- # Chama método que efetua a requisição POST no servidor SOAP
- return self._post(url, xml)
-
- def evento(self, modelo, evento, id_lote=1):
- """
- Envia um evento de nota fiscal (cancelamento e carta de correção)
- :param modelo: Modelo da nota
- :param evento: Eventro
- :param id_lote: Id do lote
- :return:
- """
-
- # url do serviço
- try:
- # manifestacao url é do AN
- if evento[0][5].text.startswith('2'):
- url = self._get_url_an(consulta='EVENTOS')
- else:
- url = self._get_url(modelo=modelo, consulta='EVENTOS')
- except Exception:
- url = self._get_url(modelo=modelo, consulta='EVENTOS')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
- raiz.append(evento)
- xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
- return self._post(url, xml)
-
- def status_servico(self, modelo):
- """
- Verifica status do servidor da receita.
- :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
- :return:
- """
- url = self._get_url(modelo, 'STATUS')
- # Monta XML do corpo da requisição
- raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
- etree.SubElement(raiz, 'xServ').text = 'STATUS'
- xml = self._construir_xml_soap('NFeStatusServico4', raiz)
- return self._post(url, xml)
-
- def download(self, cnpj, chave):
- """
- Metodo para download de NFe por parte de destinatário.
- O certificado digital deve ser o mesmo do destinatário da Nfe.
- NT 2012/002
- :param cnpj: CNPJ da empresa
- :param chave: Chave
- :return:
- """
-
- # url do serviço
- url = self._get_url_an(consulta='DOWNLOAD')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('downloadNFe', versao='1.00', xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'xServ').text = 'DOWNLOAD NFE'
- etree.SubElement(raiz, 'CNPJ').text = str(cnpj)
- etree.SubElement(raiz, 'chNFe').text = str(chave)
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NfeDownloadNF', raiz)
- return self._post(url, xml)
-
- def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
- """
- Serviço destinado ao atendimento de solicitações de inutilização de numeração.
- :param modelo: Modelo da nota
- :param cnpj: CNPJda empresa
- :param numero_inicial: Número inicial
- :param numero_final: Número final
- :param justificativa: Justificativa
- :param ano: Ano
- :param serie: Série
- :return:
- """
-
- # url do servico
- url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
-
- # Valores default
- ano = str(ano or datetime.date.today().year)[-2:]
- uf = CODIGOS_ESTADOS[self.uf.upper()]
- cnpj = so_numeros(cnpj)
-
- # Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
- # CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
- id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
- 'uf': uf,
- 'ano': ano,
- 'cnpj': cnpj,
- 'modelo': '55',
- 'serie': serie.zfill(3),
- 'num_ini': str(numero_inicial).zfill(9),
- 'num_fin': str(numero_final).zfill(9),
- }
-
- # Monta XML do corpo da requisição # FIXME
- raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
- etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
- etree.SubElement(inf_inut, 'cUF').text = uf
- etree.SubElement(inf_inut, 'ano').text = ano
- etree.SubElement(inf_inut, 'CNPJ').text = cnpj
- etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
- etree.SubElement(inf_inut, 'serie').text = serie
- etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
- etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
- etree.SubElement(inf_inut, 'xJust').text = justificativa
-
- # assinatura
- a1 = AssinaturaA1(self.certificado, self.certificado_senha)
- xml = a1.assinar(raiz)
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeInutilizacao4', xml)
- # Faz request no Servidor da Sefaz e retorna resposta
- return self._post(url, xml)
-
- def _get_url_an(self, consulta):
- # producao
- if self._ambiente == 1:
- if consulta == 'DISTRIBUICAO':
- ambiente = 'https://www1.'
- else:
- ambiente = 'https://www.'
- # homologacao
- else:
- ambiente = 'https://hom.'
-
- self.url = ambiente + NFE['AN'][consulta]
- return self.url
-
- def _get_url(self, modelo, consulta):
- """ Retorna a url para comunicação com o webservice """
- # estado que implementam webservices proprios
- lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
- if self.uf.upper() in lista:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
- self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
- elif modelo == 'nfce':
- # PE é o unico UF que possiu NFE proprio e SVRS para NFCe
- if self.uf.upper() == 'PE':
- self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
- else:
- # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
- self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
- else:
- raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
- # Estados que utilizam outros ambientes
- else:
- lista_svrs = ['AC', 'RN', 'PB', 'SC', 'SE', 'PI']
- lista_svan = ['MA','PA']
- if self.uf.upper() in lista_svrs:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
- self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
- elif modelo == 'nfce':
- # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
- self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
- else:
- raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
- elif self.uf.upper() in lista_svan:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
- self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
- elif modelo == 'nfce':
- # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
- self.url = NFCE['SVAN'][ambiente] + NFCE['SVAN'][consulta]
- else:
- raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
- return self.url
+ return raiz
- def _get_url_uf(self, modelo, consulta):
- """ Estados que implementam url diferente do padrão nacional"""
- # estados que implementam webservice SVRS
- svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO', 'PI']
- svan = ['MA', 'PA']
- # SVRS
- if self.uf.upper() in svrs:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
- # https://nfe-homologacao.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
- self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
- elif modelo == 'nfce':
- # nfce Ex: https://nfce.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
- # https://nfce-homologacao.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
- self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
- else:
- # TODO implementar outros tipos de notas como NFS-e
- pass
- # SVAN
- else:
- if self.uf.upper() in svan:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
- # https://nfe-homologacao.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
- self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
- elif modelo == 'nfce':
- # TODO não existe SVAN para nfce
- pass
- else:
- # TODO implementar outros tipos de notas como NFS-e
- pass
- return self.url
+ def _get_url_webservice_metodo(self, ws_metodo):
+ url = (
+ 'https://' +
+ self._ws_url[self._ambiente]['servidor'] +
+ '/' +
+ self._ws_url[self._ambiente][ws_metodo]
+ )
+ webservice = self._ws_metodo[ws_metodo]['webservice']
+ metodo = self._ws_metodo[ws_metodo]['metodo']
+ return url, webservice, metodo
def _construir_xml_soap(self, metodo, dados):
"""Mota o XML para o envio via SOAP"""
- raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={
- 'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP})
- body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
- a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
+
+ raiz = etree.Element(
+ '{%s}Envelope' % self._namespace_soap,
+ nsmap={
+ 'xsi': self._namespace_xsi,
+ 'xsd': self._namespace_xsd,
+ self._soap_version: self._namespace_soap,
+ })
+
+ if self._header:
+ cabecalho = self._cabecalho_soap(metodo)
+ c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap)
+ c.append(cabecalho)
+
+ body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap)
+
+ a = etree.SubElement(
+ body,
+ self._envio_mensagem,
+ xmlns=self._namespace_metodo+metodo
+ )
a.append(dados)
return raiz
- def _post_header(self):
+ def _construir_etree_ds(self, ds):
+ output = StringIO()
+ ds.export(
+ output,
+ 0,
+ pretty_print=False,
+ namespacedef_='xmlns="' + self._namespace + '"'
+ )
+ contents = output.getvalue()
+ output.close()
+ return etree.fromstring(contents)
+
+ def _post_header(self, soap_webservice_method=False):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
+ header = {
+ b'content-type': b'text/xml; charset=utf-8;',
+ }
+
# PE é a únca UF que exige SOAPAction no header
- if self.uf.upper() == 'PE':
- return {
- 'content-type': 'application/soap+xml; charset=utf-8;',
- 'Accept': 'application/soap+xml; charset=utf-8;',
- 'SOAPAction': ''
- }
- else:
- return {
- 'content-type': 'application/soap+xml; charset=utf-8;',
- 'Accept': 'application/soap+xml; charset=utf-8;'
- }
+ if soap_webservice_method:
+ header[b'SOAPAction'] = \
+ (self._namespace_metodo + soap_webservice_method).encode('utf-8')
- def _post(self, url, xml):
+ if self._accept:
+ header[b'Accept'] = b'application/soap+xml; charset=utf-8;'
+
+ return header
+
+ def _post(self, url, xml, soap_webservice_method=False):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
@@ -501,235 +145,16 @@ class ComunicacaoSefaz(Comunicacao):
)
xml = xml_declaration + xml
# Faz o request com o servidor
- result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
+ result = requests.post(
+ url.encode('utf-8'),
+ xml.encode('utf-8'),
+ headers=self._post_header(soap_webservice_method),
+ cert=chave_cert,
+ verify=False
+ )
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()
-
-
-class ComunicacaoNfse(Comunicacao):
- """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
-
- _versao = ''
- _namespace = ''
-
- def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
- self.certificado = certificado
- self.certificado_senha = certificado_senha
- self._ambiente = 2 if homologacao else 1
- self.autorizador = autorizador.upper()
- if self.autorizador == 'GINFES':
- self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
- self._versao = '3'
- elif self.autorizador == 'BETHA':
- self._namespace = NAMESPACE_BETHA
- self._versao = '2.02'
- else:
- raise Exception('Autorizador não encontrado!')
-
- def autorizacao(self, nota):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'BETHA':
- # xml
- xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
- # comunica via wsdl
- return self._post(url, xml, 'gerar')
- else:
- raise Exception('Este método só esta implementado no autorizador betha.')
-
- def enviar_lote(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # xml
- xml = '' + xml
- # comunica via wsdl
- return self._post_https(url, xml, 'enviar_lote')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def consultar(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # xml
- xml = '' + xml
- # comunica via wsdl
- return self._post_https(url, xml, 'consulta')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def consultar_rps(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'BETHA':
- # comunica via wsdl
- return self._post(url, xml, 'consultaRps')
- elif self.autorizador == 'GINFES':
- return self._post_https(url, xml, 'consultaRps')
- # TODO outros autorizadres
- else:
- raise Exception('Autorizador não encontrado!')
-
- def consultar_faixa(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'BETHA':
- # comunica via wsdl
- return self._post(url, xml, 'consultaFaixa')
- else:
- raise Exception('Este método só esta implementado no autorizador betha.')
-
- def consultar_lote(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # xml
- xml = '' + xml
- # comunica via wsdl
- return self._post_https(url, xml, 'consulta_lote')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def consultar_situacao_lote(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # comunica via wsdl
- return self._post_https(url, xml, 'consulta_situacao_lote')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def cancelar(self, xml):
- # url do serviço
- url = self._get_url()
- # Betha
- if self.autorizador == 'BETHA':
- # comunica via wsdl
- return self._post(url, xml, 'cancelar')
- # Ginfes
- elif self.autorizador == 'GINFES':
- # comunica via wsdl com certificado
- return self._post_https(url, xml, 'cancelar')
- # TODO outros autorizadres
- else:
- raise Exception('Autorizador não encontrado!')
-
- def _cabecalho(self, retorna_string=True):
- """ Monta o XML do cabeçalho da requisição wsdl
- Namespaces padrão homologação (Ginfes) """
-
- xml_declaration = ''
- # cabecalho = '3'
- # cabecalho
- raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
- etree.SubElement(raiz, 'versaoDados').text = self._versao
-
- if retorna_string:
- cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
- cabecalho = xml_declaration + cabecalho
- return cabecalho
- else:
- return raiz
-
- def _cabecalho2(self, retorna_string=True):
- """ Monta o XML do cabeçalho da requisição wsdl
- Namespaces que funcionaram em produção (Ginfes)"""
-
- xml_declaration = ''
-
- # cabecalho
- raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
- etree.SubElement(raiz, 'versaoDados').text = self._versao
-
- if retorna_string:
- cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
- cabecalho = xml_declaration + cabecalho
- return cabecalho
- else:
- return raiz
-
- def _cabecalho_ginfes(self):
- """ Retorna o XML do cabeçalho gerado pelo xsd"""
- from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
- return SerializacaoGinfes().cabecalho()
-
- def _get_url(self):
- """ Retorna a url para comunicação com o webservice """
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if self.autorizador in NFSE:
- self.url = NFSE[self.autorizador][ambiente]
- else:
- raise Exception('Autorizador nao encontrado!')
- return self.url
-
- def _post(self, url, xml, metodo):
- """ Comunicação wsdl (http) sem certificado digital """
- # cabecalho
- cabecalho = self._cabecalho()
- # comunicacao wsdl
- try:
- from suds.client import Client
- cliente = Client(url)
- # gerar nfse
- if metodo == 'gerar':
- return cliente.service.GerarNfse(cabecalho, xml)
- elif metodo == 'consultaRps':
- return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
- elif metodo == 'consultaFaixa':
- return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
- elif metodo == 'cancelar':
- return cliente.service.CancelarNfse(cabecalho, xml)
- # TODO outros metodos
- else:
- raise Exception('Método não implementado no autorizador.')
- except Exception as e:
- raise e
-
- def _post_https(self, url, xml, metodo):
- """ Comunicação wsdl (https) utilizando certificado do usuário """
- # cabecalho
- cabecalho = self._cabecalho()
- # comunicacao wsdl
- try:
- from suds.client import Client
- from pynfe.utils.https_nfse import HttpAuthenticated
-
- certificadoA1 = CertificadoA1(self.certificado)
- chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
-
- cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
-
- # gerar nfse
- if metodo == 'gerar':
- return cliente.service.GerarNfse(cabecalho, xml)
- elif metodo == 'enviar_lote':
- return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
- elif metodo == 'consulta':
- return cliente.service.ConsultarNfseV3(cabecalho, xml)
- elif metodo == 'consulta_lote':
- return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
- elif metodo == 'consulta_situacao_lote':
- return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
- elif metodo == 'consultaRps':
- return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
- elif metodo == 'consultaFaixa':
- return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
- elif metodo == 'cancelar':
- # versão 2
- return cliente.service.CancelarNfse(xml)
- # versão 3
- # return cliente.service.CancelarNfseV3(cabecalho, xml)
- # TODO outros metodos
- else:
- raise Exception('Método não implementado no autorizador.')
- except Exception as e:
- raise e
\ No newline at end of file
diff --git a/pynfe/processamento/mdfe.py b/pynfe/processamento/mdfe.py
new file mode 100644
index 0000000..3b02396
--- /dev/null
+++ b/pynfe/processamento/mdfe.py
@@ -0,0 +1,228 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 - TODAY Luis Felipe Mileo - KMEE INFORMATICA LTDA
+# License AGPL-3 - See https://www.gnu.org/licenses/lgpl-3.0.html
+
+from __future__ import division, print_function, unicode_literals
+
+import time
+
+from pynfe.utils.flags import (
+ NAMESPACE_MDFE,
+ MODELO_MDFE,
+ NAMESPACE_MDFE_METODO,
+ NAMESPACE_SOAP,
+ NAMESPACE_XSI,
+ NAMESPACE_XSD,
+)
+from pynfe.utils.webservices import (
+ MDFE_WS_URL,
+ MDFE_WS_METODO,
+ WS_MDFE_CONSULTA,
+ WS_MDFE_STATUS_SERVICO,
+ WS_MDFE_CONSULTA_NAO_ENCERRADOS,
+ WS_MDFE_RECEPCAO,
+ WS_MDFE_RET_RECEPCAO,
+ WS_MDFE_RECEPCAO_EVENTO,
+)
+from pynfe.utils import etree, extrai_id_srtxml
+from .comunicacao import ComunicacaoSefaz
+from .resposta import analisar_retorno
+
+from mdfelib.v3_00 import consStatServMDFe
+from mdfelib.v3_00 import consSitMDFe
+from mdfelib.v3_00 import consMDFeNaoEnc
+from mdfelib.v3_00 import enviMDFe
+from mdfelib.v3_00 import consReciMDFe
+
+MDFE_SITUACAO_JA_ENVIADO = ('100', '101', '132')
+
+
+class ComunicacaoMDFE(ComunicacaoSefaz):
+
+ _modelo = MODELO_MDFE
+ _namespace = NAMESPACE_MDFE
+ _versao = '3.00'
+ _ws_url = MDFE_WS_URL
+ _ws_metodo = MDFE_WS_METODO
+ _header = 'mdfeCabecMsg'
+ _envio_mensagem = 'mdfeDadosMsg'
+ _retorno_mensagem = 'mdfeRecepcaoResult'
+ _namespace_metodo = NAMESPACE_MDFE_METODO
+
+ _accept = True
+ _soap_action = False
+ _namespace_soap = NAMESPACE_SOAP
+ _namespace_xsi = NAMESPACE_XSI
+ _namespace_xsd = NAMESPACE_XSD
+ _soap_version = 'soap12'
+ _edoc_situacao_ja_enviado = MDFE_SITUACAO_JA_ENVIADO
+ _edoc_situacao_arquivo_recebido_com_sucesso = '103'
+ _edoc_situacao_em_processamento = '105'
+ _edoc_situacao_servico_em_operacao = '107'
+
+ consulta_servico_ao_enviar = True
+ maximo_tentativas_consulta_recibo = 5
+
+ def _post_soap(self, classe, ws_metodo, raiz_xml, str_xml=False):
+ url, webservice, metodo = self._get_url_webservice_metodo(
+ ws_metodo
+ )
+ if not str_xml:
+ xml = self._construir_xml_soap(
+ webservice,
+ self._construir_etree_ds(raiz_xml)
+ )
+ else:
+ etree_ds = self._construir_etree_ds(raiz_xml)
+ etree_ds.append(etree.fromstring(str_xml))
+ xml = self._construir_xml_soap(webservice, etree_ds)
+
+ retorno = self._post(
+ url, xml, soap_webservice_method=webservice + b'/' + metodo
+ )
+ return analisar_retorno(ws_metodo, retorno, classe)
+
+ def status_servico(self):
+ raiz = consStatServMDFe.TConsStatServ(
+ versao=self._versao,
+ tpAmb=str(self._ambiente),
+ xServ='STATUS',
+ )
+ raiz.original_tagname_ = 'consStatServMDFe'
+ return self._post_soap(
+ classe=consStatServMDFe,
+ ws_metodo=WS_MDFE_STATUS_SERVICO,
+ raiz_xml=raiz
+ )
+
+ def consulta(self, chave):
+ raiz = consSitMDFe.TConsSitMDFe(
+ versao=self._versao,
+ tpAmb=str(self._ambiente),
+ xServ='CONSULTAR',
+ chMDFe=chave,
+ )
+ raiz.original_tagname_ = 'consSitMDFe'
+ return self._post_soap(
+ classe=consSitMDFe,
+ ws_metodo=WS_MDFE_CONSULTA,
+ raiz_xml=raiz
+ )
+
+ def consulta_nao_encerrados(self, cnpj):
+ raiz = consMDFeNaoEnc.TConsMDFeNaoEnc(
+ versao=self._versao,
+ tpAmb=str(self._ambiente),
+ xServ='CONSULTAR NÃO ENCERRADOS',
+ CNPJ=cnpj,
+ )
+ raiz.original_tagname_ = 'consMDFeNaoEnc'
+
+ return self._post_soap(
+ classe=consMDFeNaoEnc,
+ ws_metodo=WS_MDFE_CONSULTA_NAO_ENCERRADOS,
+ raiz_xml=raiz
+ )
+
+ def autorizacao(self, str_documento_assinado, id_lote='1'):
+ raiz = enviMDFe.TEnviMDFe(
+ versao=self._versao,
+ idLote=id_lote
+ )
+ raiz.original_tagname_ = 'enviMDFe'
+ return self._post_soap(
+ classe=enviMDFe,
+ ws_metodo=WS_MDFE_RECEPCAO,
+ raiz_xml=raiz,
+ str_xml=str_documento_assinado,
+ )
+
+ def consulta_recibo(self, numero):
+ raiz = consReciMDFe.TConsReciMDFe(
+ versao=self._versao,
+ tpAmb=str(self._ambiente),
+ nRec=numero,
+ )
+ raiz.original_tagname_ = 'consReciMDFe'
+ return self._post_soap(
+ classe=consReciMDFe,
+ ws_metodo=WS_MDFE_RET_RECEPCAO,
+ raiz_xml=raiz,
+ )
+
+ def processar_documento(self, edoc):
+
+ if self.consulta_servico_ao_enviar:
+ proc_servico = self.status_servico()
+ yield proc_servico
+ #
+ # Se o serviço não estiver em operação
+ #
+ if not proc_servico.resposta.cStat == \
+ self._edoc_situacao_servico_em_operacao:
+ #
+ # Interrompe todo o processo
+ #
+ return
+ #
+ # Verificar se os documentos já não foram emitados antes
+ #
+ chave = extrai_id_srtxml(edoc)
+ if not chave:
+ #
+ # Interrompe todo o processo se o documento nao tem chave
+ #
+ return
+
+ proc_consulta = self.consulta(chave)
+ yield proc_consulta
+
+ #
+ # Se o documento já constar na SEFAZ (autorizada ou denegada)
+ #
+ if proc_consulta.resposta.cStat in self._edoc_situacao_ja_enviado:
+ #
+ # Interrompe todo o processo
+ #
+ return
+ #
+ # Documento nao foi enviado, entao vamos envia-lo
+ #
+
+ proc_envio = self.autorizacao(edoc)
+ yield proc_envio
+
+ #
+ # Deu errado?
+ #
+ if proc_envio.resposta.cStat != \
+ self._edoc_situacao_arquivo_recebido_com_sucesso:
+ #
+ # Interrompe o processo
+ #
+ return
+
+ #
+ # Aguarda o tempo do processamento antes da consulta
+ #
+ time.sleep(proc_envio.resposta.infRec.tMed * 1.3)
+
+ #
+ # Consulta o recibo do lote, para ver o que aconteceu
+ #
+ proc_recibo = self.consulta_recibo(proc_envio.resposta.infRec.nRec)
+
+ #
+ # Tenta receber o resultado do processamento do lote, caso ainda
+ # esteja em processamento
+ #
+ tentativa = 0
+ while (proc_recibo.resposta.cStat ==
+ self._edoc_situacao_em_processamento and
+ tentativa < self.maximo_tentativas_consulta_recibo):
+ time.sleep(proc_envio.resposta.infRec.tMed * 1.5)
+ tentativa += 1
+ proc_recibo = self.consulta_recibo(
+ proc_envio.resposta.infRec.nRec
+ )
+ yield proc_recibo
diff --git a/pynfe/processamento/nfe.py b/pynfe/processamento/nfe.py
new file mode 100644
index 0000000..0a0f093
--- /dev/null
+++ b/pynfe/processamento/nfe.py
@@ -0,0 +1,470 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import division, print_function, unicode_literals
+
+
+import datetime
+from pynfe.utils import etree, so_numeros
+from pynfe.utils.flags import (
+ NAMESPACE_NFE,
+ VERSAO_PADRAO,
+ CODIGOS_ESTADOS,
+ NAMESPACE_METODO,
+ NAMESPACE_SOAP,
+ NAMESPACE_XSI,
+ NAMESPACE_XSD,
+)
+
+from pynfe.utils.webservices import NFE, NFCE
+from .assinatura import AssinaturaA1
+from .comunicacao import ComunicacaoSefaz
+
+
+class ComunicacaoNFe(ComunicacaoSefaz):
+ """Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
+
+ _versao = VERSAO_PADRAO
+ _assinatura = AssinaturaA1
+ _namespace = NAMESPACE_NFE
+ _header = False
+ _envio_mensagem = 'nfeDadosMsg'
+ _namespace_metodo = NAMESPACE_METODO
+ _accept = False
+ _namespace_soap = NAMESPACE_SOAP
+ _namespace_xsi = NAMESPACE_XSI
+ _namespace_xsd = NAMESPACE_XSD
+ _soap_version = 'soap'
+
+ def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
+ """
+ Método para realizar autorização da nota de acordo com o modelo
+ :param modelo: Modelo
+ :param nota_fiscal: XML assinado
+ :param id_lote: Id do lote - numero autoincremental gerado pelo sistema
+ :param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
+ :return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
+ envia todo o soap de resposta da Sefaz para decisão do usuário.
+ """
+ # url do serviço
+ url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
+ etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
+ etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
+ raiz.append(nota_fiscal)
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
+ # Faz request no Servidor da Sefaz
+ retorno = self._post(url, xml)
+
+ # Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
+ # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
+ # import pdb
+ # pdb.set_trace()
+ if retorno.status_code == 200:
+ # namespace
+ ns = {'ns': 'http://www.portalfiscal.inf.br/nfe'}
+ if ind_sinc == 1:
+ # Procuta status no xml
+ try:
+ prot = etree.fromstring(retorno.text)
+ except ValueError:
+ # em SP retorno.text apresenta erro
+ prot = etree.fromstring(retorno.content)
+ try:
+ # Protocolo com envio OK
+ inf_prot = prot[0][0] # root protNFe
+ lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
+ # Lote processado
+ if lote_status == '104':
+ prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
+ status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
+ # autorizado usa da NF-e
+ # retorna xml final (NFe+protNFe)
+ if status == '100':
+ raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
+ raiz.append(nota_fiscal)
+ raiz.append(prot_nfe)
+ return 0, raiz
+ except IndexError:
+ # Protocolo com algum erro no Envio
+ print(retorno.text)
+ else:
+ # Retorna id do protocolo para posterior consulta em caso de sucesso.
+ try:
+ rec = etree.fromstring(retorno.text)
+ except ValueError:
+ # em SP retorno.text apresenta erro
+ rec = etree.fromstring(retorno.content)
+ rec = rec[0][0]
+ status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
+ # Lote Recebido com Sucesso!
+ if status == '103':
+ nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
+ return 0, nrec, nota_fiscal
+ return 1, retorno, nota_fiscal
+
+ def consulta_recibo(self, modelo, numero):
+ """
+ Este método oferece a consulta do resultado do processamento de um lote de NF-e.
+ O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
+ 15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
+ deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
+ Processamento".
+ :param modelo: Modelo da nota
+ :param numero: Número da nota
+ :return:
+ """
+
+ # url do serviço
+ url = self._get_url(modelo=modelo, consulta='RECIBO')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'nRec').text = numero
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
+ return self._post(url, xml)
+
+ def consulta_nota(self, modelo, chave):
+ """
+ Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
+ da Secretaria de Fazenda Estadual.
+ :param modelo: Modelo da nota
+ :param chave: Chave da nota
+ :return:
+ """
+ # url do serviço
+ url = self._get_url(modelo=modelo, consulta='CHAVE')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
+ etree.SubElement(raiz, 'chNFe').text = chave
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
+ return self._post(url, xml)
+
+ def consulta_notas_cnpj(self, cnpj, nsu=0):
+ """
+ “Serviço de Consulta da Relação de Documentos Destinados” para um determinado CNPJ de
+ destinatário informado na NF-e.
+ :param cnpj: CNPJ
+ :param nsu: NSU
+ :return:
+ """
+
+ # url do serviço
+ url = self._get_url_an(consulta='DESTINADAS')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consNFeDest', versao='1.01', xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NFE DEST'
+ etree.SubElement(raiz, 'CNPJ').text = cnpj
+
+ # Indicador de NF-e consultada:
+ # 0 = Todas as NF-e;
+ # 1 = Somente as NF-e que ainda não tiveram manifestação do destinatário (Desconhecimento da
+ # operação, Operação não Realizada ou Confirmação da Operação);
+ # 2 = Idem anterior, incluindo as NF-e que também não tiveram a Ciência da Operação.
+ etree.SubElement(raiz, 'indNFe').text = '0'
+
+ # Indicador do Emissor da NF-e:
+ # 0 = Todos os Emitentes / Remetentes;
+ # 1 = Somente as NF-e emitidas por emissores / remetentes que não tenham o mesmo CNPJ-Base do
+ # destinatário (para excluir as notas fiscais de transferência entre filiais).
+ etree.SubElement(raiz, 'indEmi').text = '0'
+
+ # Último NSU recebido pela Empresa. Caso seja informado com zero, ou com um NSU muito antigo, a consulta
+ # retornará unicamente as notas fiscais que tenham sido recepcionadas nos últimos 15 dias.
+ etree.SubElement(raiz, 'ultNSU').text = str(nsu)
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NfeConsultaDest', raiz)
+ return self._post(url, xml)
+
+ def consulta_distribuicao(self, cnpj, nsu=0):
+ pass
+
+ def consulta_cadastro(self, modelo, cnpj):
+ """
+ Consulta de cadastro
+ :param modelo: Modelo da nota
+ :param cnpj: CNPJ da empresa
+ :return:
+ """
+ # UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
+ lista_svrs = ['AC', 'RN', 'PB', 'SC', 'PI']
+
+ # RS implementa um método diferente na consulta de cadastro
+ if self.uf.upper() == 'RS':
+ url = NFE['RS']['CADASTRO']
+ elif self.uf.upper() in lista_svrs:
+ url = NFE['SVRS']['CADASTRO']
+ elif self.uf.upper() == 'SVC-RS':
+ url = NFE['SVC-RS']['CADASTRO']
+ else:
+ url = self._get_url(modelo=modelo, consulta='CADASTRO')
+
+ raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
+ info = etree.SubElement(raiz, 'infCons')
+ etree.SubElement(info, 'xServ').text = 'CONS-CAD'
+ etree.SubElement(info, 'UF').text = self.uf.upper()
+ etree.SubElement(info, 'CNPJ').text = cnpj
+ # etree.SubElement(info, 'CPF').text = cpf
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
+ # Chama método que efetua a requisição POST no servidor SOAP
+ return self._post(url, xml)
+
+ def evento(self, modelo, evento, id_lote=1):
+ """
+ Envia um evento de nota fiscal (cancelamento e carta de correção)
+ :param modelo: Modelo da nota
+ :param evento: Eventro
+ :param id_lote: Id do lote
+ :return:
+ """
+
+ # url do serviço
+ try:
+ # manifestacao url é do AN
+ if evento[0][5].text.startswith('2'):
+ url = self._get_url_an(consulta='EVENTOS')
+ else:
+ url = self._get_url(modelo=modelo, consulta='EVENTOS')
+ except Exception:
+ url = self._get_url(modelo=modelo, consulta='EVENTOS')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
+ raiz.append(evento)
+ xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
+ return self._post(url, xml)
+
+ def status_servico(self, modelo):
+ """
+ Verifica status do servidor da receita.
+ :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
+ :return:
+ """
+ url = self._get_url(modelo, 'STATUS')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
+ etree.SubElement(raiz, 'xServ').text = 'STATUS'
+ xml = self._construir_xml_soap('NFeStatusServico4', raiz)
+ return self._post(url, xml)
+
+ def download(self, cnpj, chave):
+ """
+ Metodo para download de NFe por parte de destinatário.
+ O certificado digital deve ser o mesmo do destinatário da Nfe.
+ NT 2012/002
+ :param cnpj: CNPJ da empresa
+ :param chave: Chave
+ :return:
+ """
+
+ # url do serviço
+ url = self._get_url_an(consulta='DOWNLOAD')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('downloadNFe', versao='1.00', xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'DOWNLOAD NFE'
+ etree.SubElement(raiz, 'CNPJ').text = str(cnpj)
+ etree.SubElement(raiz, 'chNFe').text = str(chave)
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NfeDownloadNF', raiz)
+ return self._post(url, xml)
+
+ def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
+ """
+ Serviço destinado ao atendimento de solicitações de inutilização de numeração.
+ :param modelo: Modelo da nota
+ :param cnpj: CNPJda empresa
+ :param numero_inicial: Número inicial
+ :param numero_final: Número final
+ :param justificativa: Justificativa
+ :param ano: Ano
+ :param serie: Série
+ :return:
+ """
+
+ # url do servico
+ url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
+
+ # Valores default
+ ano = str(ano or datetime.date.today().year)[-2:]
+ uf = CODIGOS_ESTADOS[self.uf.upper()]
+ cnpj = so_numeros(cnpj)
+
+ # Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
+ # CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
+ id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
+ 'uf': uf,
+ 'ano': ano,
+ 'cnpj': cnpj,
+ 'modelo': '55',
+ 'serie': serie.zfill(3),
+ 'num_ini': str(numero_inicial).zfill(9),
+ 'num_fin': str(numero_final).zfill(9),
+ }
+
+ # Monta XML do corpo da requisição # FIXME
+ raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
+ etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
+ etree.SubElement(inf_inut, 'cUF').text = uf
+ etree.SubElement(inf_inut, 'ano').text = ano
+ etree.SubElement(inf_inut, 'CNPJ').text = cnpj
+ etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
+ etree.SubElement(inf_inut, 'serie').text = serie
+ etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
+ etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
+ etree.SubElement(inf_inut, 'xJust').text = justificativa
+
+ # assinatura
+ a1 = AssinaturaA1(self.certificado, self.certificado_senha)
+ xml = a1.assinar(raiz)
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeInutilizacao4', xml)
+ # Faz request no Servidor da Sefaz e retorna resposta
+ return self._post(url, xml)
+
+ def _get_url_an(self, consulta):
+ # producao
+ if self._ambiente == 1:
+ if consulta == 'DISTRIBUICAO':
+ ambiente = 'https://www1.'
+ else:
+ ambiente = 'https://www.'
+ # homologacao
+ else:
+ ambiente = 'https://hom.'
+
+ self.url = ambiente + NFE['AN'][consulta]
+ return self.url
+
+ def _get_url(self, modelo, consulta):
+ """ Retorna a url para comunicação com o webservice """
+ # estado que implementam webservices proprios
+ lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
+ if self.uf.upper() in lista:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
+ self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
+ elif modelo == 'nfce':
+ # PE é o unico UF que possiu NFE proprio e SVRS para NFCe
+ if self.uf.upper() == 'PE':
+ self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
+ else:
+ # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
+ self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
+ else:
+ raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
+ # Estados que utilizam outros ambientes
+ else:
+ lista_svrs = ['AC', 'RN', 'PB', 'SC', 'SE', 'PI']
+ lista_svan = ['MA','PA']
+ if self.uf.upper() in lista_svrs:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
+ self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
+ elif modelo == 'nfce':
+ # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
+ self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
+ else:
+ raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
+ elif self.uf.upper() in lista_svan:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
+ self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
+ elif modelo == 'nfce':
+ # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
+ self.url = NFCE['SVAN'][ambiente] + NFCE['SVAN'][consulta]
+ else:
+ raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
+ return self.url
+
+ def _get_url_uf(self, modelo, consulta):
+ """ Estados que implementam url diferente do padrão nacional"""
+ # estados que implementam webservice SVRS
+ svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO', 'PI']
+ svan = ['MA', 'PA']
+ # SVRS
+ if self.uf.upper() in svrs:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
+ # https://nfe-homologacao.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
+ self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
+ elif modelo == 'nfce':
+ # nfce Ex: https://nfce.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
+ # https://nfce-homologacao.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
+ self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
+ else:
+ # TODO implementar outros tipos de notas como NFS-e
+ pass
+ # SVAN
+ else:
+ if self.uf.upper() in svan:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.svrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
+ # https://nfe-homologacao.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx
+ self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
+ elif modelo == 'nfce':
+ # TODO não existe SVAN para nfce
+ pass
+ else:
+ # TODO implementar outros tipos de notas como NFS-e
+ pass
+ return self.url
+
+ def _cabecalho_soap(self, metodo):
+ """Monta o XML do cabeçalho da requisição SOAP"""
+
+ raiz = etree.Element('nfeCabecMsg', xmlns=NAMESPACE_METODO+metodo)
+ if metodo == 'RecepcaoEvento':
+ etree.SubElement(raiz, 'versaoDados').text = '1.00'
+ elif metodo == 'NfeConsultaDest':
+ etree.SubElement(raiz, 'versaoDados').text = '1.01'
+ elif metodo == 'NfeDownloadNF':
+ etree.SubElement(raiz, 'versaoDados').text = '1.00'
+ elif metodo == 'CadConsultaCadastro2':
+ etree.SubElement(raiz, 'versaoDados').text = '2.00'
+ else:
+ etree.SubElement(raiz, 'versaoDados').text = VERSAO_PADRAO
+ etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
+ return raiz
diff --git a/pynfe/processamento/nfse.py b/pynfe/processamento/nfse.py
new file mode 100644
index 0000000..c5e5fbe
--- /dev/null
+++ b/pynfe/processamento/nfse.py
@@ -0,0 +1,237 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import division, print_function, unicode_literals
+
+from pynfe.utils import etree
+from pynfe.utils.flags import (
+ NAMESPACE_XSI,
+ NAMESPACE_BETHA,
+)
+from pynfe.utils.webservices import NFSE
+from pynfe.entidades.certificado import CertificadoA1
+from .comunicacao import Comunicacao
+
+
+class ComunicacaoNfse(Comunicacao):
+ """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
+
+ _versao = ''
+ _namespace = ''
+
+ def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
+ self.certificado = certificado
+ self.certificado_senha = certificado_senha
+ self._ambiente = 2 if homologacao else 1
+ self.autorizador = autorizador.upper()
+ if self.autorizador == 'GINFES':
+ self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
+ self._versao = '3'
+ elif self.autorizador == 'BETHA':
+ self._namespace = NAMESPACE_BETHA
+ self._versao = '2.02'
+ else:
+ raise Exception('Autorizador não encontrado!')
+
+ def autorizacao(self, nota):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'BETHA':
+ # xml
+ xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
+ # comunica via wsdl
+ return self._post(url, xml, 'gerar')
+ else:
+ raise Exception('Este método só esta implementado no autorizador betha.')
+
+ def enviar_lote(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # xml
+ xml = '' + xml
+ # comunica via wsdl
+ return self._post_https(url, xml, 'enviar_lote')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def consultar(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # xml
+ xml = '' + xml
+ # comunica via wsdl
+ return self._post_https(url, xml, 'consulta')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def consultar_rps(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'BETHA':
+ # comunica via wsdl
+ return self._post(url, xml, 'consultaRps')
+ elif self.autorizador == 'GINFES':
+ return self._post_https(url, xml, 'consultaRps')
+ # TODO outros autorizadres
+ else:
+ raise Exception('Autorizador não encontrado!')
+
+ def consultar_faixa(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'BETHA':
+ # comunica via wsdl
+ return self._post(url, xml, 'consultaFaixa')
+ else:
+ raise Exception('Este método só esta implementado no autorizador betha.')
+
+ def consultar_lote(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # xml
+ xml = '' + xml
+ # comunica via wsdl
+ return self._post_https(url, xml, 'consulta_lote')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def consultar_situacao_lote(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # comunica via wsdl
+ return self._post_https(url, xml, 'consulta_situacao_lote')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def cancelar(self, xml):
+ # url do serviço
+ url = self._get_url()
+ # Betha
+ if self.autorizador == 'BETHA':
+ # comunica via wsdl
+ return self._post(url, xml, 'cancelar')
+ # Ginfes
+ elif self.autorizador == 'GINFES':
+ # comunica via wsdl com certificado
+ return self._post_https(url, xml, 'cancelar')
+ # TODO outros autorizadres
+ else:
+ raise Exception('Autorizador não encontrado!')
+
+ def _cabecalho(self, retorna_string=True):
+ """ Monta o XML do cabeçalho da requisição wsdl
+ Namespaces padrão homologação (Ginfes) """
+
+ xml_declaration = ''
+ # cabecalho = '3'
+ # cabecalho
+ raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
+ etree.SubElement(raiz, 'versaoDados').text = self._versao
+
+ if retorna_string:
+ cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
+ cabecalho = xml_declaration + cabecalho
+ return cabecalho
+ else:
+ return raiz
+
+ def _cabecalho2(self, retorna_string=True):
+ """ Monta o XML do cabeçalho da requisição wsdl
+ Namespaces que funcionaram em produção (Ginfes)"""
+
+ xml_declaration = ''
+
+ # cabecalho
+ raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
+ etree.SubElement(raiz, 'versaoDados').text = self._versao
+
+ if retorna_string:
+ cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
+ cabecalho = xml_declaration + cabecalho
+ return cabecalho
+ else:
+ return raiz
+
+ def _cabecalho_ginfes(self):
+ """ Retorna o XML do cabeçalho gerado pelo xsd"""
+ from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
+ return SerializacaoGinfes().cabecalho()
+
+ def _get_url(self):
+ """ Retorna a url para comunicação com o webservice """
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if self.autorizador in NFSE:
+ self.url = NFSE[self.autorizador][ambiente]
+ else:
+ raise Exception('Autorizador nao encontrado!')
+ return self.url
+
+ def _post(self, url, xml, metodo):
+ """ Comunicação wsdl (http) sem certificado digital """
+ # cabecalho
+ cabecalho = self._cabecalho()
+ # comunicacao wsdl
+ try:
+ from suds.client import Client
+ cliente = Client(url)
+ # gerar nfse
+ if metodo == 'gerar':
+ return cliente.service.GerarNfse(cabecalho, xml)
+ elif metodo == 'consultaRps':
+ return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
+ elif metodo == 'consultaFaixa':
+ return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
+ elif metodo == 'cancelar':
+ return cliente.service.CancelarNfse(cabecalho, xml)
+ # TODO outros metodos
+ else:
+ raise Exception('Método não implementado no autorizador.')
+ except Exception as e:
+ raise e
+
+ def _post_https(self, url, xml, metodo):
+ """ Comunicação wsdl (https) utilizando certificado do usuário """
+ # cabecalho
+ cabecalho = self._cabecalho()
+ # comunicacao wsdl
+ try:
+ from suds.client import Client
+ from pynfe.utils.https_nfse import HttpAuthenticated
+
+ certificadoA1 = CertificadoA1(self.certificado)
+ chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
+
+ cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
+
+ # gerar nfse
+ if metodo == 'gerar':
+ return cliente.service.GerarNfse(cabecalho, xml)
+ elif metodo == 'enviar_lote':
+ return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
+ elif metodo == 'consulta':
+ return cliente.service.ConsultarNfseV3(cabecalho, xml)
+ elif metodo == 'consulta_lote':
+ return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
+ elif metodo == 'consulta_situacao_lote':
+ return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
+ elif metodo == 'consultaRps':
+ return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
+ elif metodo == 'consultaFaixa':
+ return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
+ elif metodo == 'cancelar':
+ # versão 2
+ return cliente.service.CancelarNfse(xml)
+ # versão 3
+ # return cliente.service.CancelarNfseV3(cabecalho, xml)
+ # TODO outros metodos
+ else:
+ raise Exception('Método não implementado no autorizador.')
+ except Exception as e:
+ raise e
diff --git a/pynfe/processamento/resposta.py b/pynfe/processamento/resposta.py
new file mode 100644
index 0000000..592ec5f
--- /dev/null
+++ b/pynfe/processamento/resposta.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 - TODAY Luis Felipe Mileo - KMEE INFORMATICA LTDA
+# License AGPL-3 - See https://www.gnu.org/licenses/lgpl-3.0.html
+
+import re
+from pynfe.utils import etree
+
+
+class RetornoSoap(object):
+
+ def __init__(self, webservice, retorno, resposta):
+ self.webservice = webservice
+ self.resposta = resposta
+ self.retorno = retorno
+
+
+def analisar_retorno(webservice, retorno, classe_resposta):
+
+ retorno.raise_for_status()
+
+ match = re.search('(.*?)', retorno.text)
+
+ if match:
+ resultado = etree.tostring(etree.fromstring(match.group(1))[0])
+ classe_resposta.Validate_simpletypes_ = False
+ resposta = classe_resposta.parseString(resultado.encode('utf-8'))
+
+ return RetornoSoap(webservice, retorno, resposta)
diff --git a/pynfe/utils/__init__.py b/pynfe/utils/__init__.py
index 2aeeab6..9646891 100644
--- a/pynfe/utils/__init__.py
+++ b/pynfe/utils/__init__.py
@@ -3,13 +3,17 @@
import os
import codecs
from unicodedata import normalize
+import re
try:
from lxml import etree
except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree')
-from io import StringIO
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
try:
from . import flags
@@ -146,3 +150,11 @@ def obter_uf_por_codigo(codigo_uf):
def remover_acentos(txt):
return normalize('NFKD', txt).encode('ASCII','ignore').decode('ASCII')
+
+
+def extrai_id_srtxml(edoc):
+ result = ''
+ match = re.search('Id=[^0-9]+(\d+)"', edoc)
+ if match:
+ result = match.group(1)
+ return result
diff --git a/pynfe/utils/flags.py b/pynfe/utils/flags.py
index 7c1db9e..4390aee 100644
--- a/pynfe/utils/flags.py
+++ b/pynfe/utils/flags.py
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
+from __future__ import division, print_function, unicode_literals
+
NAMESPACE_NFE = 'http://www.portalfiscal.inf.br/nfe'
NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#'
NAMESPACE_SOAP = 'http://www.w3.org/2003/05/soap-envelope'
@@ -266,3 +268,7 @@ CODIGOS_ESTADOS = {
'DF': '53',
'AN': '91'
}
+
+NAMESPACE_MDFE = 'http://www.portalfiscal.inf.br/mdfe'
+NAMESPACE_MDFE_METODO = 'http://www.portalfiscal.inf.br/mdfe/wsdl/'
+MODELO_MDFE = '58'
diff --git a/pynfe/utils/webservices.py b/pynfe/utils/webservices.py
index 0b3b1cd..ffd5ad8 100644
--- a/pynfe/utils/webservices.py
+++ b/pynfe/utils/webservices.py
@@ -1,8 +1,10 @@
-
+# -*- coding: utf-8 -*-
"""
@author: Junior Tada, Leonardo Tada
"""
+from __future__ import division, print_function, unicode_literals
+
# http://nfce.encat.org/desenvolvedor/qrcode/
# http://nfce.encat.org/consumidor/consulte-sua-nota/ url consulta por chave
# Nfc-e
@@ -483,4 +485,68 @@ NFSE = {
'HTTPS':'https://producao.ginfes.com.br/ServiceGinfesImpl?wsdl',
'HOMOLOGACAO':'https://homologacao.ginfes.com.br/ServiceGinfesImpl?wsdl'
}
-}
\ No newline at end of file
+}
+
+AMBIENTE_PRODUCAO = 1
+AMBIENTE_HOMOLOGACAO = 2
+
+WS_MDFE_RECEPCAO = '1'
+WS_MDFE_RET_RECEPCAO = '2'
+WS_MDFE_RECEPCAO_EVENTO = '3'
+WS_MDFE_CONSULTA = '4'
+WS_MDFE_STATUS_SERVICO = '5'
+WS_MDFE_CONSULTA_NAO_ENCERRADOS = '6'
+
+MDFE_WS_METODO = {
+ WS_MDFE_RECEPCAO: {
+ 'webservice': 'MDFeRecepcao',
+ 'metodo': 'mdfeRecepcaoLote',
+ 'versao': '3.00',
+ },
+ WS_MDFE_RET_RECEPCAO: {
+ 'webservice': 'MDFeRetRecepcao',
+ 'metodo': 'mdfeRetRecepcao',
+ 'versao': '3.00',
+ },
+ WS_MDFE_RECEPCAO_EVENTO: {
+ 'webservice': 'MDFeRecepcaoEvento',
+ 'metodo': 'mdfeRecepcaoEvento',
+ 'versao': '3.00',
+ },
+ WS_MDFE_CONSULTA: {
+ 'webservice': 'MDFeConsulta',
+ 'metodo': 'mdfeConsultaMDF',
+ 'versao': '3.00',
+ },
+ WS_MDFE_STATUS_SERVICO: {
+ 'webservice': 'MDFeStatusServico',
+ 'metodo': 'mdfeStatusServicoMDF',
+ 'versao': '3.00',
+ },
+ WS_MDFE_CONSULTA_NAO_ENCERRADOS: {
+ 'webservice': 'MDFeConsNaoEnc',
+ 'metodo': 'mdfeConsNaoEnc',
+ 'versao': '3.00',
+ },
+}
+
+MDFE_WS_URL = {
+ AMBIENTE_PRODUCAO: {
+ 'servidor': 'mdfe.svrs.rs.gov.br',
+ WS_MDFE_RECEPCAO: 'ws/MDFerecepcao/MDFeRecepcao.asmx',
+ WS_MDFE_RET_RECEPCAO: 'ws/MDFeRetRecepcao/MDFeRetRecepcao.asmx',
+ WS_MDFE_RECEPCAO_EVENTO: 'ws/MDFeRecepcaoEvento/MDFeRecepcaoEvento.asmx',
+ WS_MDFE_CONSULTA: 'ws/MDFeConsulta/MDFeConsulta.asmx',
+ WS_MDFE_STATUS_SERVICO: 'ws/MDFeStatusServico/MDFeStatusServico.asmx',
+ WS_MDFE_CONSULTA_NAO_ENCERRADOS: 'ws/MDFeConsNaoEnc/MDFeConsNaoEnc.asmx',
+ },
+ AMBIENTE_HOMOLOGACAO: {
+ 'servidor': 'mdfe-homologacao.svrs.rs.gov.br',
+ WS_MDFE_RECEPCAO: 'ws/MDFerecepcao/MDFeRecepcao.asmx',
+ WS_MDFE_RET_RECEPCAO: 'ws/MDFeRetRecepcao/MDFeRetRecepcao.asmx',
+ WS_MDFE_RECEPCAO_EVENTO: 'ws/MDFeRecepcaoEvento/MDFeRecepcaoEvento.asmx',
+ WS_MDFE_CONSULTA: 'ws/MDFeConsulta/MDFeConsulta.asmx',
+ WS_MDFE_STATUS_SERVICO: 'ws/MDFeStatusServico/MDFeStatusServico.asmx',
+ WS_MDFE_CONSULTA_NAO_ENCERRADOS: 'ws/MDFeConsNaoEnc/MDFeConsNaoEnc.asmx',
+ },
+}