Compare commits

...

19 Commits

Author SHA1 Message Date
Junior Tada 29ca7a0a8c
Merge pull request #114 from leogregianin/new_mdfe 4 years ago
leogregianin 33185a4bfd
Campo RENAVAM como não obrigatório no MDFe 4 years ago
Junior Tada 6c2ad867a0
Merge pull request #111 from TadaSoftware/master 5 years ago
Junior Tada 01f53eaee0
Merge pull request #79 from leogregianin/new_mdfe 5 years ago
leogregianin 647fbb0f62 Merge remote-tracking branch 'remotes/upstream/master' into new_mdfe 5 years ago
leogregianin 2ea532c398 Atualização da branch new_mdfe com as últimas alterações da master 5 years ago
Leonardo Gregianin 7cde3cf756 Correção do envio de encerramento na conversão de int para str 5 years ago
Leonardo Gregianin 9cbdc2bba0 fix: Correção na serialização de valores do MDFe para os veículos 5 years ago
Leonardo Gregianin 85a6e6ad24 MDFe: correção na serialização do veículo tração e reboque 5 years ago
Leonardo Gregianin f8e84e51de correção do envio assíncrono 6 years ago
Leonardo Gregianin 8bfe84c8d8 correção no envio do evento "CONSULTA NÃO ENCERRADOS" 6 years ago
Leonardo Gregianin 4b302ac0d7 correção da consulta do status da nfe 6 years ago
Leonardo Gregianin e473ccd3a1 correção da serialização do cancelamento da mdfe 6 years ago
Leonardo Gregianin 4b48fae55f Criação classe de serialização do MDFe 6 years ago
Leonardo Gregianin c50fd8c293 Criação das entidades do Manifesto 6 years ago
Leonardo Gregianin d796de62e1 Criação do método de envio/transmissão de mdfe 6 years ago
Leonardo Gregianin a00071c25d Criação dos status do mdfe 6 years ago
Leonardo Gregianin 1e0719cf86 Adicionada as urls do QRCode e da transmissão síncrona 6 years ago
Leonardo Gregianin 1d38a8cfd6 feat: Criada a comunicação com o webservice do MDF-e. Separação das classes de NFe, NFSe e MDFe. 6 years ago
  1. 2
      pynfe/entidades/__init__.py
  2. 111
      pynfe/entidades/evento.py
  3. 444
      pynfe/entidades/manifesto.py
  4. 5
      pynfe/processamento/__init__.py
  5. 649
      pynfe/processamento/comunicacao.py
  6. 286
      pynfe/processamento/mdfe.py
  7. 439
      pynfe/processamento/nfe.py
  8. 234
      pynfe/processamento/nfse.py
  9. 28
      pynfe/processamento/resposta.py
  10. 572
      pynfe/processamento/serializacao.py
  11. 14
      pynfe/utils/__init__.py
  12. 16
      pynfe/utils/flags.py
  13. 17
      pynfe/utils/webservices.py

2
pynfe/entidades/__init__.py

@ -3,9 +3,9 @@ from .produto import Produto
from .cliente import Cliente
from .transportadora import Transportadora
from .notafiscal import NotaFiscal
from .manifesto import Manifesto
from .lotes import LoteNotaFiscal
from .fonte_dados import _fonte_dados
from .certificado import CertificadoA1
from .evento import EventoCancelarNota
from .servico import Servico

111
pynfe/entidades/evento.py

@ -4,8 +4,10 @@
@author: Junior Tada, Leonardo Tada
"""
from decimal import Decimal
from .base import Entidade
class Evento(Entidade):
# - Identificador da TAG a ser assinada, a regra de formação do Id é: “ID” + tpEvento + chave da NF-e + nSeqEvento
id = str()
@ -114,4 +116,111 @@ class EventoManifestacaoDest(Evento):
# - Informar a justificativa porque a operação não foi realizada, este campo deve ser informado somente no evento de Operação não Realizada. (min 15 max 255 caracteres)
justificativa = str()
class EventoEncerramento(Evento):
def __init__(self, *args, **kwargs):
super(EventoEncerramento, self).__init__(*args, **kwargs)
# - Código do evento = 110112
self.tp_evento = '110112'
# - "Encerramento"
self.descricao = 'Encerramento'
# - Informar o número do Protocolo de Autorização da MDF-e a ser Encerrada
protocolo = str()
# - Data e hora do evento no formato AAAA-MM-DDThh:mm:ssTZD
dtenc = None
# - uf de onde a manifesto foi encerrado
cuf = str()
# - minicipio onde o manifesto foi encerrado
cmun = str()
class EventoInclusaoCondutor(Evento):
def __init__(self, *args, **kwargs):
super(EventoInclusaoCondutor, self).__init__(*args, **kwargs)
# - Código do evento = 110114
self.tp_evento = '110114'
# - "Encerramento"
self.descricao = 'Inclusão Condutor'
# - Nome do motorista
nome_motorista = str()
# - CPF do motorista
cpf_motorista = str()
class EventoInclusaoDFe(Evento):
def __init__(self, *args, **kwargs):
super(EventoInclusaoDFe, self).__init__(*args, **kwargs)
# - Código do evento = 110115
self.tp_evento = '110115'
# - "Inclusao DF-e"
self.descricao = 'Inclusao DF-e'
# - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e
protocolo = str()
# - Código IBGE do Município de Carregamento
cmun_carrega = str()
# - Nome do Município de Carregamento
xmun_carrega = str()
# - Código IBGE do Município de Descarga
cmun_descarga = str()
# - Nome do Município de Descarga
xmun_descarga = str()
# - Chave de Acesso da NF-e a ser incluída no MDFe
chave_nfe = str()
class EventoInclusaoPagamento(Evento):
def __init__(self, *args, **kwargs):
super(EventoInclusaoPagamento, self).__init__(*args, **kwargs)
# - Código do evento = 110116
self.tp_evento = '110116'
# - "Pagamento Operacao MDF-e"
self.descricao = 'Pagamento Operacao MDF-e'
# - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e
protocolo = str()
# - Quantidade de viagens
qtd_viagens = str()
# - Número da viagem
nro_viagens = str()
# Informações do pagamento
# - Nome do Contratante
nome_contratante = str()
# - CPF/CNPJ do Contratante
cpfcnpj_contratante = str()
# Componentes do Pagamento
# - Tipo do pagamento
tpComp = str()
# - Valor
vComp = Decimal()
# - Valor total do contrato
vContrato = Decimal()
# - Tipo do pagamento (0=a vista e 1=a prazo)
indPag = str()
# Se o pagamento for a prazo
# - Numero da parcela
nParcela = str()
# - Data vencimento
dVenc = None
# - Valor da parcela
vParcela = Decimal()
# Informações bancárias
# - CNPJ da Instituição de Pagamento eletrônico do Frete
CNPJIPEF = str()
# - Código do Banco
codBanco = str()
# - Código da Agência
codAgencia = str()

444
pynfe/entidades/manifesto.py

@ -0,0 +1,444 @@
# -*- coding: utf-8 -*-
import random
from .base import Entidade
from pynfe import get_version
from pynfe.utils.flags import MDFE_STATUS, CODIGO_BRASIL, CODIGOS_ESTADOS
from pynfe.utils import so_numeros
from decimal import Decimal
class Manifesto(Entidade):
status = MDFE_STATUS[0]
# - UF - converter para codigos em CODIGOS_ESTADOS
uf = str()
# tpAmb
# - Tipo Emitente
# 1=Transportadora; 2=Carga própria; 3=CTe Globalizado
tipo_emitente = int()
# - Tipo transportador - 0=nenhum; 1=etc; 2=tac; 3=ctc
tipo_transportador = int()
# Manifesto fixo 58
# - Modelo (formato: NN)
modelo = 58
# - Serie (obrigatorio - formato: NNN)
serie = str()
# - Numero MDFe (obrigatorio)
numero_mdfe = str()
# - Código numérico aleatório que compõe a chave de acesso
codigo_numerico_aleatorio = str()
# - Digito verificador do codigo numerico aleatorio
dv_codigo_numerico_aleatorio = str()
# - Tipo do modal de transporte
# 1=Rodoviario; 2=Aereo; 3=Aquaviario; 4=Ferroviario
modal = 1
# - Data da Emissao (obrigatorio)
data_emissao = None
# - Forma de emissao (obrigatorio - seleciona de lista) - NF_FORMAS_EMISSAO
forma_emissao = str()
# - Processo de emissão da NF-e (obrigatorio - seleciona de lista) - NF_PROCESSOS_EMISSAO
processo_emissao = 0
# - Versao do processo de emissão do MDF-e
versao_processo_emissao = get_version()
# - UF inicio. Exemplo SP, MT, PR
UFIni = str()
# - UF final. Exemplo SP, MT, PR
UFFim = str()
# - Digest value da NF-e (somente leitura)
digest_value = None
# - Protocolo (somente leitura)
protocolo = str()
# - Data (somente leitura)
data = None
# - Municípios carregamento (lista 1 para * / ManyToManyField)
municipio_carrega = None
# - Percurso da viagem (lista 1 para * / ManyToManyField)
percurso = None
# Data inicial da viagem
dhIniViagem = None
# - Emitente (lista 1 para * / ManyToManyField)
emitente = None
# - Modal rodoviario (lista 1 para * / ManyToManyField)
modal_rodoviario = None
# - Documentos vinculados NFe ou CTe (lista 1 para * / ManyToManyField)
documentos = None
# - Seguradora (lista 1 para * / ManyToManyField)
seguradora = None
# - Produto predominante
produto = None
# - Resumo dos Totais do MDF-e
totais = None
# - Lacres
lacres = None
# - Informacoes Adicionais
# - Informacoes adicionais de interesse do fisco
informacoes_adicionais_interesse_fisco = str()
# - Informacoes complementares de interesse do contribuinte
informacoes_complementares_interesse_contribuinte = str()
def __init__(self, *args, **kwargs):
self.municipio_carrega = []
self.percurso = []
self.modal_rodoviario = []
self.documentos = []
self.seguradora = []
self.produto = []
self.lacres = []
self.responsavel_tecnico = []
super(Manifesto, self).__init__(*args, **kwargs)
def __str__(self):
return ' '.join([str(self.modelo), self.serie, self.numero_mdfe])
def adicionar_municipio_carrega(self, **kwargs):
obj = ManifestoMunicipioCarrega(**kwargs)
self.municipio_carrega.append(obj)
return obj
def adicionar_percurso(self, **kwargs):
obj = ManifestoPercurso(**kwargs)
self.percurso.append(obj)
return obj
def adicionar_modal_rodoviario(self, **kwargs):
obj = ManifestoRodoviario(**kwargs)
self.modal_rodoviario.append(obj)
return obj
def adicionar_documentos(self, **kwargs):
obj = ManifestoDocumentos(**kwargs)
self.documentos.append(obj)
return obj
def adicionar_seguradora(self, **kwargs):
obj = ManifestoSeguradora(**kwargs)
self.seguradora.append(obj)
return obj
def adicionar_produto(self, **kwargs):
obj = ManifestoProduto(**kwargs)
self.produto.append(obj)
return obj
def adicionar_totais(self, **kwargs):
obj = ManifestoTotais(**kwargs)
self.totais.append(obj)
return obj
def adicionar_lacres(self, **kwargs):
obj = ManifestoLacres(**kwargs)
self.lacres.append(obj)
return obj
def adicionar_responsavel_tecnico(self, **kwargs):
""" Adiciona uma instancia de Responsavel Tecnico """
obj = ManifestoResponsavelTecnico(**kwargs)
self.responsavel_tecnico.append(obj)
return obj
def _codigo_numerico_aleatorio(self):
self.codigo_numerico_aleatorio = str(random.randint(0, 99999999)).zfill(8)
return self.codigo_numerico_aleatorio
def _dv_codigo_numerico(self, key):
assert len(key) == 43
weights = [2, 3, 4, 5, 6, 7, 8, 9]
weights_size = len(weights)
key_numbers = [int(k) for k in key]
key_numbers.reverse()
key_sum = 0
for i, key_number in enumerate(key_numbers):
# cycle though weights
i = i % weights_size
key_sum += key_number * weights[i]
remainder = key_sum % 11
if remainder == 0 or remainder == 1:
self.dv_codigo_numerico_aleatorio = '0'
return '0'
self.dv_codigo_numerico_aleatorio = str(11 - remainder)
return str(self.dv_codigo_numerico_aleatorio)
@property
# @memoize
def identificador_unico(self):
# Monta 'Id' da tag raiz <infMDFe>
# Ex.: MDFe35080599999090910270580010000000011518005123
key = "%(uf)s%(ano)s%(mes)s%(cnpj)s%(mod)s%(serie)s%(nMDF)s%(tpEmis)s%(cMDF)s"%{
'uf': CODIGOS_ESTADOS[self.uf],
'ano': self.data_emissao.strftime('%y'),
'mes': self.data_emissao.strftime('%m'),
'cnpj': so_numeros(self.emitente.cpfcnpj).zfill(14),
'mod': self.modelo,
'serie': str(self.serie).zfill(3),
'nMDF': str(self.numero_mdfe).zfill(9),
'tpEmis': str(self.forma_emissao),
'cMDF': self._codigo_numerico_aleatorio(),
}
return "MDFe%(uf)s%(ano)s%(mes)s%(cnpj)s%(mod)s%(serie)s%(nMDF)s%(tpEmis)s%(cMDF)s%(cDV)s"%{
'uf': CODIGOS_ESTADOS[self.uf],
'ano': self.data_emissao.strftime('%y'),
'mes': self.data_emissao.strftime('%m'),
'cnpj': so_numeros(self.emitente.cpfcnpj).zfill(14),
'mod': self.modelo,
'serie': str(self.serie).zfill(3),
'nMDF': str(self.numero_mdfe).zfill(9),
'tpEmis': str(self.forma_emissao),
'cMDF': str(self.codigo_numerico_aleatorio),
'cDV': self._dv_codigo_numerico(key),
}
class ManifestoMunicipioCarrega(Entidade):
# - Codigo municipio
cMunCarrega = str()
# - Nome do municipio
xMunCarrega = str()
class ManifestoPercurso(Entidade):
# - Nome da UF (2 digitos)
UFPer = str()
class ManifestoRodoviario(Entidade):
rntrc = str()
ciot = None
pedagio = None
contratante = None
pagamento = None
veiculo_tracao = None
veiculo_reboque = None
class ManifestoCIOT(Entidade):
numero_ciot = str()
cpfcnpj = str()
class ManifestoPedagio(Entidade):
cnpj_fornecedor = str()
cpfcnpj_pagador = str()
numero_compra = str()
valor_pedagio = Decimal()
class ManifestoContratante(Entidade):
nome = str()
cpfcnpj = str()
class ManifestoVeiculoTracao(Entidade):
cInt = str()
placa = str()
RENAVAM = str()
tara = str()
capKG = str()
capM3 = str()
proprietario = None
condutor = None
tpRod = str()
tpCar = str()
UF = str()
class ManifestoVeiculoReboque(Entidade):
cInt = str()
placa = str()
RENAVAM = str()
tara = str()
capKG = str()
capM3 = str()
proprietario = None
tpCar = str()
UF = str()
class ManifestoCondutor(Entidade):
nome_motorista = str()
cpf_motorista = str()
class ManifestoDocumentos(Entidade):
# Código do municipio de descarga
cMunDescarga = str()
# Nome do municipio de descarga
xMunDescarga = str()
# Documentos vinculados
documentos_nfe = None
documentos_cte = None
class ManifestoDocumentosNFe(Entidade):
chave_acesso_nfe = str()
class ManifestoDocumentosCTe(Entidade):
chave_acesso_cte = str()
class ManifestoSeguradora(Entidade):
# infResp - Responsavel seguro
# 1=Emitente; 2=Tomador
responsavel_seguro = str()
# - CNPJ do responsavel
cnpj_responsavel = str()
# infSeg - Seguradora
# - Nome da seguradora
nome_seguradora = str()
# - CNPJ seguradora
cnpj_seguradora = str()
# Apolice do Seguro
numero_apolice = str()
# Lista de Averbacoes
averbacoes = None
class ManifestoAverbacao(Entidade):
# Numero da Averbacao
numero = str()
class ManifestoProduto(Entidade):
# Tipo de carga
# 01=GranelSolido
# 02=GranelLiquido
# 03=Frigorificada
# 04=Conteinerizada
# 05=CargaGeral
# 06=Neogranel
# 07=PerigosaGranelSolido
# 08=PerigosaGranelLiquido
# 09=PerigosaCargaFrigorificada
# 10=PerigosaConteinerizada
# 11=PerigosaCargaGeral
tipo_carga = str()
nome_produto = str()
cean = str()
ncm = str()
class ManifestoEmitente(Entidade):
# Dados do Emitente
# - CPF ou CNPJ (obrigatorio)
cpfcnpj = str()
# - Inscricao Estadual (obrigatorio)
inscricao_estadual = str()
# - Nome/Razao Social (obrigatorio)
razao_social = str()
# - Nome Fantasia
nome_fantasia = str()
# Endereco
# - Logradouro (obrigatorio)
endereco_logradouro = str()
# - Numero (obrigatorio)
endereco_numero = str()
# - Complemento
endereco_complemento = str()
# - Bairro (obrigatorio)
endereco_bairro = str()
# - Codigo Municipio (opt)
endereco_cod_municipio = str()
# - Municipio (obrigatorio)
endereco_municipio = str()
# - CEP
endereco_cep = str()
# - UF (obrigatorio)
endereco_uf = str()
# - Telefone
endereco_telefone = str()
# - Email
endereco_email = str()
def __str__(self):
return self.cpfcnpj
class ManifestoTotais(Entidade):
# Quantidade total de CT-e relacionados no Manifesto
qCTe = int()
# Quantidade total de NF-e relacionadas no Manifesto
qNFe = int()
# Valor total da carga / mercadorias transportadas
vCarga = Decimal()
# - Código da unidade de medida do Peso Bruto da Carga / Mercadorias transportadas
# Unidades: 01 – KG; 02 - TON
cUnid = str()
# - Peso Bruto Total da Carga / Mercadorias transportadas
qCarga = Decimal()
class ManifestoLacres(Entidade):
nLacre = str()
class ManifestoResponsavelTecnico(Entidade):
# NT 2018/003
cnpj = str()
contato = str()
email = str()
fone = str()
csrt = str()

5
pynfe/processamento/__init__.py

@ -2,5 +2,8 @@ from .serializacao import SerializacaoXML
from .serializacao import SerializacaoNfse
from .validacao import Validacao
from .assinatura import AssinaturaA1
from .comunicacao import ComunicacaoSefaz
from pynfe.entidades.certificado import CertificadoA1
from .nfe import ComunicacaoNFe
from .mdfe import ComunicacaoMDFe
from .nfse import ComunicacaoNfse
from .danfe import DanfeNfce

649
pynfe/processamento/comunicacao.py

@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
import re
import ssl
import datetime
import requests
from pynfe.utils import etree, so_numeros
from pynfe.utils.flags import (
NAMESPACE_NFE,
@ -15,14 +14,13 @@ from pynfe.utils.flags import (
NAMESPACE_METODO
)
from pynfe.utils.webservices import NFE, NFCE, NFSE
from pynfe.entidades.certificado import CertificadoA1
from .assinatura import AssinaturaA1
class Comunicacao(object):
"""
Classe abstrata responsavel por definir os metodos e logica das classes
de comunicação com os webservices da NF-e.
de comunicação com os webservices.
"""
_ambiente = 1 # 1 = Produção, 2 = Homologação
@ -30,639 +28,22 @@ class Comunicacao(object):
certificado = None
certificado_senha = None
url = None
_versao = False
_assinatura = AssinaturaA1
_namespace = False
_header = False
_envio_mensagem = False
_namespace_metodo = False
_accept = False
_soap_action = False
_ws_url = False
_namespace_soap = NAMESPACE_SOAP
_namespace_xsi = NAMESPACE_XSI
_namespace_xsd = NAMESPACE_XSD
_soap_version = 'soap'
def __init__(self, uf, certificado, certificado_senha, homologacao=False):
self.uf = uf
self.certificado = certificado
self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1
class ComunicacaoSefaz(Comunicacao):
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
_versao = VERSAO_PADRAO
_assinatura = AssinaturaA1
def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
"""
Método para realizar autorização da nota de acordo com o modelo
:param modelo: Modelo
:param nota_fiscal: XML assinado
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
:return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
envia todo o soap de resposta da Sefaz para decisão do usuário.
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
# Monta XML do corpo da requisição
raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
raiz.append(nota_fiscal)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
# Faz request no Servidor da Sefaz
retorno = self._post(url, xml)
# Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
if retorno.status_code == 200:
# namespace
ns = {'ns': NAMESPACE_NFE}
# Procuta status no xml
try:
prot = etree.fromstring(retorno.text)
except ValueError:
# em SP retorno.text apresenta erro
prot = etree.fromstring(retorno.content)
if ind_sinc == 1:
try:
# Protocolo com envio OK
try:
inf_prot = prot[0][0] # root protNFe
except IndexError:
# Estados como GO vem com a tag header
inf_prot = prot[1][0]
lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote processado
if lote_status == '104':
prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
# autorizado usa da NF-e
# retorna xml final (protNFe+NFe)
if status == '100':
raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
raiz.append(nota_fiscal)
raiz.append(prot_nfe)
return 0, raiz
except IndexError:
# Protocolo com algum erro no Envio
return 1, retorno, nota_fiscal
else:
# Retorna id do protocolo para posterior consulta em caso de sucesso.
rec = prot[0][0]
status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote Recebido com Sucesso!
if status == '103':
nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
return 0, nrec, nota_fiscal
return 1, retorno, nota_fiscal
def consulta_recibo(self, modelo, numero):
"""
Este método oferece a consulta do resultado do processamento de um lote de NF-e.
O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
Processamento".
:param modelo: Modelo da nota
:param numero: Número da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='RECIBO')
# Monta XML do corpo da requisição
raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'nRec').text = numero
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
return self._post(url, xml)
def consulta_nota(self, modelo, chave):
"""
Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
da Secretaria de Fazenda Estadual.
:param modelo: Modelo da nota
:param chave: Chave da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='CHAVE')
# Monta XML do corpo da requisição
raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
etree.SubElement(raiz, 'chNFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
return self._post(url, xml)
def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0):
"""
O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag
informada no XML. As tags são distNSU, consNSU e consChNFe.
a) distNSU Distribuição de Conjunto de DF-e a Partir do NSU Informado
b) consNSU Consulta DF-e Vinculado ao NSU Informado
c) consChNFe Consulta de NF-e por Chave de Acesso Informada
:param cnpj: CNPJ do interessado
:param cpf: CPF do interessado
:param chave: Chave da NF-e a ser consultada
:param nsu: Ultimo nsu ou nsu específico para ser consultado.
:return:
"""
# url
url = self._get_url_an(consulta='DISTRIBUICAO')
# Monta XML para envio da requisição
raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
if self.uf:
etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()]
if cnpj:
etree.SubElement(raiz, 'CNPJ').text = cnpj
else:
etree.SubElement(raiz, 'CPF').text = cpf
if not chave:
distNSU = etree.SubElement(raiz, 'distNSU')
etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15)
if chave:
consChNFe = etree.SubElement(raiz, 'consChNFe')
etree.SubElement(consChNFe, 'chNFe').text = chave
#Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz)
return self._post(url, xml)
def consulta_cadastro(self, modelo, cnpj):
"""
Consulta de cadastro
:param modelo: Modelo da nota
:param cnpj: CNPJ da empresa
:return:
"""
# UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
lista_svrs = ['AC', 'RN', 'PB', 'SC', 'PA']
# RS implementa um método diferente na consulta de cadastro
# usa o mesmo url para produção e homologação
# não tem url para NFCE
if self.uf.upper() == 'RS':
url = NFE['RS']['CADASTRO']
elif self.uf.upper() in lista_svrs:
url = NFE['SVRS']['CADASTRO']
elif self.uf.upper() == 'SVC-RS':
url = NFE['SVC-RS']['CADASTRO']
else:
url = self._get_url(modelo=modelo, consulta='CADASTRO')
raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
info = etree.SubElement(raiz, 'infCons')
etree.SubElement(info, 'xServ').text = 'CONS-CAD'
etree.SubElement(info, 'UF').text = self.uf.upper()
etree.SubElement(info, 'CNPJ').text = cnpj
# etree.SubElement(info, 'CPF').text = cpf
# Monta XML para envio da requisição
xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
# Chama método que efetua a requisição POST no servidor SOAP
return self._post(url, xml)
def evento(self, modelo, evento, id_lote=1):
"""
Envia um evento de nota fiscal (cancelamento e carta de correção)
:param modelo: Modelo da nota
:param evento: Eventro
:param id_lote: Id do lote
:return:
"""
# url do serviço
try:
# manifestacao url é do AN
if evento[0][5].text.startswith('2'):
url = self._get_url_an(consulta='EVENTOS')
else:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
except Exception:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
# Monta XML do corpo da requisição
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
raiz.append(evento)
xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
return self._post(url, xml)
def status_servico(self, modelo):
"""
Verifica status do servidor da receita.
:param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
:return:
"""
url = self._get_url(modelo, 'STATUS')
# Monta XML do corpo da requisição
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
etree.SubElement(raiz, 'xServ').text = 'STATUS'
xml = self._construir_xml_soap('NFeStatusServico4', raiz)
return self._post(url, xml)
def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
"""
Serviço destinado ao atendimento de solicitações de inutilização de numeração.
:param modelo: Modelo da nota
:param cnpj: CNPJda empresa
:param numero_inicial: Número inicial
:param numero_final: Número final
:param justificativa: Justificativa
:param ano: Ano
:param serie: Série
:return:
"""
# url do servico
url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
# Valores default
ano = str(ano or datetime.date.today().year)[-2:]
uf = CODIGOS_ESTADOS[self.uf.upper()]
cnpj = so_numeros(cnpj)
# Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
# CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
'uf': uf,
'ano': ano,
'cnpj': cnpj,
'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e;
'serie': serie.zfill(3),
'num_ini': str(numero_inicial).zfill(9),
'num_fin': str(numero_final).zfill(9),
}
# Monta XML do corpo da requisição # FIXME
raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
etree.SubElement(inf_inut, 'cUF').text = uf
etree.SubElement(inf_inut, 'ano').text = ano
etree.SubElement(inf_inut, 'CNPJ').text = cnpj
etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
etree.SubElement(inf_inut, 'serie').text = serie
etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
etree.SubElement(inf_inut, 'xJust').text = justificativa
# assinatura
a1 = AssinaturaA1(self.certificado, self.certificado_senha)
xml = a1.assinar(raiz)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeInutilizacao4', xml)
# Faz request no Servidor da Sefaz e retorna resposta
return self._post(url, xml)
def _get_url_an(self, consulta):
# producao
if self._ambiente == 1:
if consulta == 'DISTRIBUICAO':
ambiente = 'https://www1.'
else:
ambiente = 'https://www.'
# homologacao
else:
ambiente = 'https://hom.'
self.url = ambiente + NFE['AN'][consulta]
return self.url
def _get_url(self, modelo, consulta):
""" Retorna a url para comunicação com o webservice """
# estado que implementam webservices proprios
lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
if self.uf.upper() in lista:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
elif modelo == 'nfce':
# PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe
if self.uf.upper() == 'PE' or self.uf.upper() == 'BA':
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# Estados que utilizam outros ambientes
else:
lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO', 'PA']
if self.uf.upper() in lista_svrs:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# unico UF que utiliza SVAN ainda para NF-e
# SVRS para NFC-e
elif self.uf.upper() == 'MA':
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
else:
raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}")
return self.url
def _construir_xml_soap(self, metodo, dados, cabecalho=False):
"""Mota o XML para o envio via SOAP"""
raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={
'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP})
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
## distribuição tem um corpo de xml diferente
if metodo == 'NFeDistribuicaoDFe':
x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo)
a = etree.SubElement(x, 'nfeDadosMsg')
else:
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
a.append(dados)
return raiz
def _post_header(self):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
# PE é a única UF que exige SOAPAction no header
response = {
'content-type': 'application/soap+xml; charset=utf-8;',
'Accept': 'application/soap+xml; charset=utf-8;',
}
if self.uf.upper() == 'PE':
response["SOAPAction"] = ""
return response
def _post(self, url, xml):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
try:
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# limpa xml com caracteres bugados para infNFeSupl em NFC-e
xml = re.sub(
'<qrCode>(.*?)</qrCode>',
lambda x: x.group(0).replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', ''),
etree.tostring(xml, encoding='unicode').replace('\n', '')
)
xml = xml_declaration + xml
# Faz o request com o servidor
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()
class ComunicacaoNfse(Comunicacao):
""" Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
_versao = ''
_namespace = ''
def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
self.certificado = certificado
self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1
self.autorizador = autorizador.upper()
if self.autorizador == 'GINFES':
self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
self._versao = '3'
elif self.autorizador == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
else:
raise Exception('Autorizador não encontrado!')
def autorizacao(self, nota):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# xml
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
# comunica via wsdl
return self._post(url, xml, 'gerar')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def enviar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'enviar_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_rps(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaRps')
elif self.autorizador == 'GINFES':
return self._post_https(url, xml, 'consultaRps')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def consultar_faixa(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaFaixa')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def consultar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_situacao_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# comunica via wsdl
return self._post_https(url, xml, 'consulta_situacao_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def cancelar(self, xml):
# url do serviço
url = self._get_url()
# Betha
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'cancelar')
# Ginfes
elif self.autorizador == 'GINFES':
# comunica via wsdl com certificado
return self._post_https(url, xml, 'cancelar')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def _cabecalho(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces padrão homologação (Ginfes) """
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho = '<ns2:cabecalho versao="3" xmlns:ns2="http://www.ginfes.com.br/cabecalho_v03.xsd"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><versaoDados>3</versaoDados></ns2:cabecalho>'
# cabecalho
raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho2(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces que funcionaram em produção (Ginfes)"""
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho_ginfes(self):
""" Retorna o XML do cabeçalho gerado pelo xsd"""
from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
return SerializacaoGinfes().cabecalho()
def _get_url(self):
""" Retorna a url para comunicação com o webservice """
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if self.autorizador in NFSE:
self.url = NFSE[self.autorizador][ambiente]
else:
raise Exception('Autorizador nao encontrado!')
return self.url
def _post(self, url, xml, metodo):
""" Comunicação wsdl (http) sem certificado digital """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
cliente = Client(url)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
return cliente.service.CancelarNfse(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e
def _post_https(self, url, xml, metodo):
""" Comunicação wsdl (https) utilizando certificado do usuário """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
from pynfe.utils.https_nfse import HttpAuthenticated
certificadoA1 = CertificadoA1(self.certificado)
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'enviar_lote':
return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta':
return cliente.service.ConsultarNfseV3(cabecalho, xml)
elif metodo == 'consulta_lote':
return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta_situacao_lote':
return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
# versão 2
return cliente.service.CancelarNfse(xml)
# versão 3
# return cliente.service.CancelarNfseV3(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e

286
pynfe/processamento/mdfe.py

@ -0,0 +1,286 @@
# -*- coding: utf-8 -*-
import time
import re
import requests
from io import StringIO
import base64
from pynfe.utils.flags import (
NAMESPACE_MDFE,
MODELO_MDFE,
VERSAO_MDFE,
NAMESPACE_MDFE_METODO,
NAMESPACE_SOAP,
NAMESPACE_XSI,
NAMESPACE_XSD,
CODIGOS_ESTADOS
)
from pynfe.utils.webservices import MDFE
from pynfe.entidades.certificado import CertificadoA1
from pynfe.utils import etree, extrai_id_srtxml
from .comunicacao import Comunicacao
from .resposta import analisar_retorno
MDFE_SITUACAO_JA_ENVIADO = ('100', '101', '132')
class ComunicacaoMDFe(Comunicacao):
_modelo = MODELO_MDFE
_namespace = NAMESPACE_MDFE
_versao = VERSAO_MDFE
_header = 'mdfeCabecMsg'
_envio_mensagem = 'mdfeDadosMsg'
_retorno_mensagem = 'mdfeRecepcaoResult'
_namespace_metodo = NAMESPACE_MDFE_METODO
_accept = True
_soap_action = False
_namespace_soap = NAMESPACE_SOAP
_namespace_xsi = NAMESPACE_XSI
_namespace_xsd = NAMESPACE_XSD
_soap_version = 'soap12'
_edoc_situacao_ja_enviado = MDFE_SITUACAO_JA_ENVIADO
_edoc_situacao_arquivo_recebido_com_sucesso = '103'
_edoc_situacao_lote_processado = '104'
_edoc_situacao_em_processamento = '105'
_edoc_situacao_servico_em_operacao = '107'
consulta_servico_ao_enviar = True
maximo_tentativas_consulta_recibo = 5
def autorizacao(self, manifesto, id_lote=1, ind_sinc=1):
"""
Método para realizar autorização do manifesto
:param manifesto: XML assinado
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
:return: Uma tupla que em caso de sucesso, retorna xml com manifesto e protocolo de autorização.
Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
"""
# url do serviço
if ind_sinc == 0:
url = self._get_url(consulta='RECEPCAO')
elif ind_sinc == 1:
url = self._get_url(consulta='RECEPCAO_SINC')
else:
raise f'ind_sinc deve ser 0=assincrono ou 1=sincrono'
# Monta XML do corpo da requisição
raiz = etree.Element('enviMDFe', xmlns=NAMESPACE_MDFE, versao=VERSAO_MDFE)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
raiz.append(manifesto)
# Monta XML para envio da requisição
if ind_sinc == 0:
xml = self._construir_xml_soap('MDFeRecepcao', raiz)
elif ind_sinc == 1:
xml = self._construir_xml_soap('MDFeRecepcaoSinc', raiz)
# Faz request no Servidor da Sefaz
retorno = self._post(url, xml)
# Em caso de sucesso, retorna xml com o mdfe e protocolo de autorização.
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
if retorno.status_code == 200:
# namespace
ns = {'ns': NAMESPACE_MDFE}
# Procuta status no xml
try:
prot = etree.fromstring(retorno.text)
except ValueError:
# em SP retorno.text apresenta erro
prot = etree.fromstring(retorno.content)
if ind_sinc == 1:
try:
# Protocolo com envio OK
inf_prot = prot[1][0]
lote_status = inf_prot.xpath("ns:retEnviMDFe/ns:cStat", namespaces=ns)[0].text
# Lote processado
if lote_status == self._edoc_situacao_lote_processado:
prot_mdfe = inf_prot.xpath("ns:retEnviMDFe/ns:protMDFe", namespaces=ns)[0]
status = prot_mdfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
# autorizado uso do MDF-e
# retorna xml final (protMDFe + MDFe)
if status in self._edoc_situacao_ja_enviado: # if status == '100':
raiz = etree.Element('mdfeProc', xmlns=NAMESPACE_MDFE, versao=VERSAO_MDFE)
raiz.append(manifesto)
raiz.append(prot_mdfe)
return 0, raiz
except IndexError:
# Protocolo com algum erro no Envio
return 1, retorno, manifesto
else:
# Retorna id do protocolo para posterior consulta em caso de sucesso.
rec = prot[1][0]
status = rec.xpath("ns:retEnviMDFe/ns:cStat", namespaces=ns)[0].text
# Lote Recebido com Sucesso!
if status == self._edoc_situacao_arquivo_recebido_com_sucesso:
nrec = rec.xpath("ns:retEnviMDFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
return 0, nrec, manifesto
return 1, retorno, manifesto
def status_servico(self):
url = self._get_url('STATUS')
# Monta XML do corpo da requisição
raiz = etree.Element('consStatServMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'STATUS'
xml = self._construir_xml_soap('MDFeStatusServico', raiz)
return self._post(url, xml)
def consulta(self, chave):
url = self._get_url('CONSULTA')
# Monta XML do corpo da requisição
raiz = etree.Element('consSitMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
etree.SubElement(raiz, 'chMDFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_soap('MDFeConsulta', raiz)
return self._post(url, xml)
def consulta_nao_encerrados(self, cpfcnpj):
url = self._get_url('NAO_ENCERRADOS')
# Monta XML do corpo da requisição
raiz = etree.Element('consMDFeNaoEnc', xmlns=NAMESPACE_MDFE, versao=self._versao)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NÃO ENCERRADOS'
if len(cpfcnpj) == 11:
etree.SubElement(raiz, 'CPF').text = cpfcnpj.zfill(11)
else:
etree.SubElement(raiz, 'CNPJ').text = cpfcnpj.zfill(14)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('MDFeConsNaoEnc', raiz)
return self._post(url, xml)
def consulta_recibo(self, numero):
url = self._get_url('RET_RECEPCAO')
# Monta XML do corpo da requisição
raiz = etree.Element('consReciMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'nRec').text = numero.zfill(15)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('MDFeRetRecepcao', raiz)
return self._post(url, xml)
def evento(self, evento):
"""
Envia eventos do MDFe como:
Encerramento
Cancelamento
Inclusao Condutor
Inclusao DF-e
Pagamento Operacao MDF-e
:param evento: Nome do Evento
:return:
"""
# url do serviço
url = self._get_url('EVENTOS')
# Monta XML do corpo da requisição
xml = self._construir_xml_soap('MDFeRecepcaoEvento', evento)
return self._post(url, xml)
def _construir_xml_soap(self, metodo, dados):
"""Mota o XML para o envio via SOAP"""
raiz = etree.Element(
'{%s}Envelope' % NAMESPACE_SOAP,
nsmap={
'xsi': self._namespace_xsi,
'xsd': self._namespace_xsd,
self._soap_version: self._namespace_soap
}
)
if self._header:
cabecalho = self._cabecalho_soap(metodo)
c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap)
c.append(cabecalho)
body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap)
a = etree.SubElement(
body,
self._envio_mensagem,
xmlns=self._namespace_metodo+metodo
)
# if metodo == 'MDFeRecepcaoSinc':
# body_base64 = base64.b16encode(a).decode()
a.append(dados)
return raiz
def _post_header(self, soap_webservice_method=False):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
header = {
b'content-type': b'text/xml; charset=utf-8;',
}
# PE é a únca UF que exige SOAPAction no header
if soap_webservice_method:
header[b'SOAPAction'] = \
(self._namespace_metodo + soap_webservice_method).encode('utf-8')
if self._accept:
header[b'Accept'] = b'application/soap+xml; charset=utf-8;'
return header
def _post(self, url, xml):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
try:
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# limpa xml com caracteres bugados para infMDFeSupl em NFC-e
xml = re.sub(
'<qrCodMDFe>(.*?)</qrCodMDFe>',
lambda x: x.group(0).replace('&lt;', '<').replace('&gt;', '>').replace('amp;', ''),
etree.tostring(xml, encoding='unicode').replace('\n', '')
)
xml = xml_declaration + xml
xml = xml.encode('utf8') # necessário para o evento "CONSULTAR NÃO ENCERRADOS"
# Faz o request com o servidor
result = requests.post(
url,
xml,
headers=self._post_header(),
cert=chave_cert,
verify=False
)
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()
def _cabecalho_soap(self, metodo):
"""Monta o XML do cabeçalho da requisição SOAP"""
raiz = etree.Element(
self._header,
xmlns=self._namespace_metodo + metodo
)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
etree.SubElement(raiz, 'versaoDados').text = '3.00'
return raiz
def _get_url(self, consulta):
# producao
if self._ambiente == 1:
ambiente = MDFE['SVRS']['HTTPS']
# homologacao
else:
ambiente = MDFE['SVRS']['HOMOLOGACAO']
self.url = ambiente + MDFE['SVRS'][consulta]
return self.url

439
pynfe/processamento/nfe.py

@ -0,0 +1,439 @@
# -*- coding: utf-8 -*-
import datetime
import re
import requests
from pynfe.utils import etree, so_numeros
from pynfe.utils.flags import (
NAMESPACE_NFE,
VERSAO_PADRAO,
CODIGOS_ESTADOS,
NAMESPACE_METODO,
NAMESPACE_SOAP,
NAMESPACE_XSI,
NAMESPACE_XSD,
)
from pynfe.entidades.certificado import CertificadoA1
from pynfe.utils.webservices import NFE, NFCE
from .assinatura import AssinaturaA1
from .comunicacao import Comunicacao
class ComunicacaoNFe(Comunicacao):
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
_versao = VERSAO_PADRAO
_assinatura = AssinaturaA1
_namespace = NAMESPACE_NFE
_header = False
_envio_mensagem = 'nfeDadosMsg'
_namespace_metodo = NAMESPACE_METODO
_accept = False
_namespace_soap = NAMESPACE_SOAP
_namespace_xsi = NAMESPACE_XSI
_namespace_xsd = NAMESPACE_XSD
_soap_version = 'soap'
def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
"""
Método para realizar autorização da nota de acordo com o modelo
:param modelo: Modelo
:param nota_fiscal: XML assinado
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
:return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
envia todo o soap de resposta da Sefaz para decisão do usuário.
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
# Monta XML do corpo da requisição
raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
raiz.append(nota_fiscal)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
# Faz request no Servidor da Sefaz
retorno = self._post(url, xml)
# Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
if retorno.status_code == 200:
# namespace
ns = {'ns': NAMESPACE_NFE}
# Procuta status no xml
try:
prot = etree.fromstring(retorno.text)
except ValueError:
# em SP retorno.text apresenta erro
prot = etree.fromstring(retorno.content)
if ind_sinc == 1:
try:
# Protocolo com envio OK
try:
inf_prot = prot[0][0] # root protNFe
except IndexError:
# Estados como GO vem com a tag header
inf_prot = prot[1][0]
lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote processado
if lote_status == '104':
prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
# autorizado usa da NF-e
# retorna xml final (protNFe+NFe)
if status == '100':
raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
raiz.append(nota_fiscal)
raiz.append(prot_nfe)
return 0, raiz
except IndexError:
# Protocolo com algum erro no Envio
return 1, retorno, nota_fiscal
else:
# Retorna id do protocolo para posterior consulta em caso de sucesso.
rec = prot[0][0]
status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote Recebido com Sucesso!
if status == '103':
nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
return 0, nrec, nota_fiscal
return 1, retorno, nota_fiscal
def consulta_recibo(self, modelo, numero):
"""
Este método oferece a consulta do resultado do processamento de um lote de NF-e.
O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
Processamento".
:param modelo: Modelo da nota
:param numero: Número da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='RECIBO')
# Monta XML do corpo da requisição
raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'nRec').text = numero
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
return self._post(url, xml)
def consulta_nota(self, modelo, chave):
"""
Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
da Secretaria de Fazenda Estadual.
:param modelo: Modelo da nota
:param chave: Chave da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='CHAVE')
# Monta XML do corpo da requisição
raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
etree.SubElement(raiz, 'chNFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
return self._post(url, xml)
def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0):
"""
O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag
informada no XML. As tags são distNSU, consNSU e consChNFe.
a) distNSU Distribuição de Conjunto de DF-e a Partir do NSU Informado
b) consNSU Consulta DF-e Vinculado ao NSU Informado
c) consChNFe Consulta de NF-e por Chave de Acesso Informada
:param cnpj: CNPJ do interessado
:param cpf: CPF do interessado
:param chave: Chave da NF-e a ser consultada
:param nsu: Ultimo nsu ou nsu específico para ser consultado.
:return:
"""
# url
url = self._get_url_an(consulta='DISTRIBUICAO')
# Monta XML para envio da requisição
raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
if self.uf:
etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()]
if cnpj:
etree.SubElement(raiz, 'CNPJ').text = cnpj
else:
etree.SubElement(raiz, 'CPF').text = cpf
if not chave:
distNSU = etree.SubElement(raiz, 'distNSU')
etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15)
if chave:
consChNFe = etree.SubElement(raiz, 'consChNFe')
etree.SubElement(consChNFe, 'chNFe').text = chave
#Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz)
return self._post(url, xml)
def consulta_cadastro(self, modelo, cnpj):
"""
Consulta de cadastro
:param modelo: Modelo da nota
:param cnpj: CNPJ da empresa
:return:
"""
# UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
lista_svrs = ['AC', 'RN', 'PB', 'SC', 'PA']
# RS implementa um método diferente na consulta de cadastro
# usa o mesmo url para produção e homologação
# não tem url para NFCE
if self.uf.upper() == 'RS':
url = NFE['RS']['CADASTRO']
elif self.uf.upper() in lista_svrs:
url = NFE['SVRS']['CADASTRO']
elif self.uf.upper() == 'SVC-RS':
url = NFE['SVC-RS']['CADASTRO']
else:
url = self._get_url(modelo=modelo, consulta='CADASTRO')
raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
info = etree.SubElement(raiz, 'infCons')
etree.SubElement(info, 'xServ').text = 'CONS-CAD'
etree.SubElement(info, 'UF').text = self.uf.upper()
etree.SubElement(info, 'CNPJ').text = cnpj
# etree.SubElement(info, 'CPF').text = cpf
# Monta XML para envio da requisição
xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
# Chama método que efetua a requisição POST no servidor SOAP
return self._post(url, xml)
def evento(self, modelo, evento, id_lote=1):
"""
Envia um evento de nota fiscal (cancelamento e carta de correção)
:param modelo: Modelo da nota
:param evento: Eventro
:param id_lote: Id do lote
:return:
"""
# url do serviço
try:
# manifestacao url é do AN
if evento[0][5].text.startswith('2'):
url = self._get_url_an(consulta='EVENTOS')
else:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
except Exception:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
# Monta XML do corpo da requisição
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
raiz.append(evento)
xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
return self._post(url, xml)
def status_servico(self, modelo):
"""
Verifica status do servidor da receita.
:param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
:return:
"""
url = self._get_url(modelo, 'STATUS')
# Monta XML do corpo da requisição
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
etree.SubElement(raiz, 'xServ').text = 'STATUS'
xml = self._construir_xml_soap('NFeStatusServico4', raiz)
return self._post(url, xml)
def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
"""
Serviço destinado ao atendimento de solicitações de inutilização de numeração.
:param modelo: Modelo da nota
:param cnpj: CNPJda empresa
:param numero_inicial: Número inicial
:param numero_final: Número final
:param justificativa: Justificativa
:param ano: Ano
:param serie: Série
:return:
"""
# url do servico
url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
# Valores default
ano = str(ano or datetime.date.today().year)[-2:]
uf = CODIGOS_ESTADOS[self.uf.upper()]
cnpj = so_numeros(cnpj)
# Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
# CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
'uf': uf,
'ano': ano,
'cnpj': cnpj,
'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e;
'serie': serie.zfill(3),
'num_ini': str(numero_inicial).zfill(9),
'num_fin': str(numero_final).zfill(9),
}
# Monta XML do corpo da requisição # FIXME
raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
etree.SubElement(inf_inut, 'cUF').text = uf
etree.SubElement(inf_inut, 'ano').text = ano
etree.SubElement(inf_inut, 'CNPJ').text = cnpj
etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
etree.SubElement(inf_inut, 'serie').text = serie
etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
etree.SubElement(inf_inut, 'xJust').text = justificativa
# assinatura
a1 = AssinaturaA1(self.certificado, self.certificado_senha)
xml = a1.assinar(raiz)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeInutilizacao4', xml)
# Faz request no Servidor da Sefaz e retorna resposta
return self._post(url, xml)
def _get_url_an(self, consulta):
# producao
if self._ambiente == 1:
if consulta == 'DISTRIBUICAO':
ambiente = 'https://www1.'
else:
ambiente = 'https://www.'
# homologacao
else:
ambiente = 'https://hom.'
self.url = ambiente + NFE['AN'][consulta]
return self.url
def _get_url(self, modelo, consulta):
""" Retorna a url para comunicação com o webservice """
# estado que implementam webservices proprios
lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
if self.uf.upper() in lista:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
elif modelo == 'nfce':
# PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe
if self.uf.upper() == 'PE' or self.uf.upper() == 'BA':
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# Estados que utilizam outros ambientes
else:
lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO', 'PA']
if self.uf.upper() in lista_svrs:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# unico UF que utiliza SVAN ainda para NF-e
# SVRS para NFC-e
elif self.uf.upper() == 'MA':
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
else:
raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}")
return self.url
def _construir_xml_soap(self, metodo, dados, cabecalho=False):
"""Mota o XML para o envio via SOAP"""
raiz = etree.Element(
'{%s}Envelope' % NAMESPACE_SOAP,
nsmap={
'xsi': NAMESPACE_XSI,
'xsd': NAMESPACE_XSD,
'soap': NAMESPACE_SOAP
}
)
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
# distribuição tem um corpo de xml diferente
if metodo == 'NFeDistribuicaoDFe':
x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo)
a = etree.SubElement(x, 'nfeDadosMsg')
else:
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
a.append(dados)
return raiz
def _post_header(self):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
# PE é a única UF que exige SOAPAction no header
response = {
'content-type': 'application/soap+xml; charset=utf-8;',
'Accept': 'application/soap+xml; charset=utf-8;',
}
if self.uf.upper() == 'PE':
response["SOAPAction"] = ""
return response
def _post(self, url, xml):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
try:
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# limpa xml com caracteres bugados para infNFeSupl em NFC-e
xml = re.sub(
'<qrCode>(.*?)</qrCode>',
lambda x: x.group(0).replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', ''),
etree.tostring(xml, encoding='unicode').replace('\n', '')
)
xml = xml_declaration + xml
# Faz o request com o servidor
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()

234
pynfe/processamento/nfse.py

@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
from pynfe.utils import etree
from pynfe.utils.flags import (
NAMESPACE_XSI,
NAMESPACE_BETHA,
)
from pynfe.utils.webservices import NFSE
from pynfe.entidades.certificado import CertificadoA1
from .comunicacao import Comunicacao
class ComunicacaoNfse(Comunicacao):
""" Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
_versao = ''
_namespace = ''
def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
self.certificado = certificado
self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1
self.autorizador = autorizador.upper()
if self.autorizador == 'GINFES':
self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
self._versao = '3'
elif self.autorizador == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
else:
raise Exception('Autorizador não encontrado!')
def autorizacao(self, nota):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# xml
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
# comunica via wsdl
return self._post(url, xml, 'gerar')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def enviar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'enviar_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_rps(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaRps')
elif self.autorizador == 'GINFES':
return self._post_https(url, xml, 'consultaRps')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def consultar_faixa(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaFaixa')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def consultar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_situacao_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# comunica via wsdl
return self._post_https(url, xml, 'consulta_situacao_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def cancelar(self, xml):
# url do serviço
url = self._get_url()
# Betha
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'cancelar')
# Ginfes
elif self.autorizador == 'GINFES':
# comunica via wsdl com certificado
return self._post_https(url, xml, 'cancelar')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def _cabecalho(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces padrão homologação (Ginfes) """
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho = '<ns2:cabecalho versao="3" xmlns:ns2="http://www.ginfes.com.br/cabecalho_v03.xsd"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><versaoDados>3</versaoDados></ns2:cabecalho>'
# cabecalho
raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho2(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces que funcionaram em produção (Ginfes)"""
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho_ginfes(self):
""" Retorna o XML do cabeçalho gerado pelo xsd"""
from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
return SerializacaoGinfes().cabecalho()
def _get_url(self):
""" Retorna a url para comunicação com o webservice """
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if self.autorizador in NFSE:
self.url = NFSE[self.autorizador][ambiente]
else:
raise Exception('Autorizador nao encontrado!')
return self.url
def _post(self, url, xml, metodo):
""" Comunicação wsdl (http) sem certificado digital """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
cliente = Client(url)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
return cliente.service.CancelarNfse(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e
def _post_https(self, url, xml, metodo):
""" Comunicação wsdl (https) utilizando certificado do usuário """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
from pynfe.utils.https_nfse import HttpAuthenticated
certificadoA1 = CertificadoA1(self.certificado)
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'enviar_lote':
return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta':
return cliente.service.ConsultarNfseV3(cabecalho, xml)
elif metodo == 'consulta_lote':
return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta_situacao_lote':
return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
# versão 2
return cliente.service.CancelarNfse(xml)
# versão 3
# return cliente.service.CancelarNfseV3(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e

28
pynfe/processamento/resposta.py

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
import re
from pynfe.utils import etree
class RetornoSoap(object):
def __init__(self, webservice, retorno, resposta):
self.webservice = webservice
self.resposta = resposta
self.retorno = retorno
def analisar_retorno(webservice, retorno, classe_resposta):
# retorno.raise_for_status()
# print(retorno.text)
match = re.search('<soap:Body>(.*?)</soap:Body>', retorno.text)
if match:
resultado = etree.tostring(etree.fromstring(match.group(1))[0])
# classe_resposta.Validate_simpletypes_ = False
# resposta = classe_resposta.parseString(resultado)
resposta = resultado
# resposta = retorno.text
return RetornoSoap(webservice, retorno, resposta)

572
pynfe/processamento/serializacao.py

@ -1,10 +1,26 @@
# -*- coding: utf-8 -*-
from pynfe.entidades import NotaFiscal
from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, \
obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal, \
remover_acentos, obter_uf_por_codigo, obter_codigo_por_municipio
from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO, NAMESPACE_NFE, NAMESPACE_SIG, VERSAO_QRCODE
from pynfe.utils.webservices import NFCE
from pynfe.entidades import NotaFiscal, Manifesto
from pynfe.utils import (
etree,
so_numeros,
obter_municipio_por_codigo,
obter_pais_por_codigo,
obter_municipio_e_codigo,
formatar_decimal,
remover_acentos,
obter_uf_por_codigo,
obter_codigo_por_municipio
)
from pynfe.utils.flags import (
CODIGOS_ESTADOS,
VERSAO_PADRAO,
VERSAO_MDFE,
NAMESPACE_NFE,
NAMESPACE_MDFE,
NAMESPACE_SIG,
VERSAO_QRCODE
)
from pynfe.utils.webservices import NFCE, MDFE
import base64
import hashlib
from datetime import datetime
@ -764,7 +780,6 @@ class SerializacaoXML(Serializacao):
raiz.append(self._serializar_responsavel_tecnico(
nota_fiscal.responsavel_tecnico[0], retorna_string=False))
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
@ -802,6 +817,98 @@ class SerializacaoXML(Serializacao):
else:
return raiz
def serializar_evento_mdfe(self, evento, tag_raiz='eventoMDFe', retorna_string=False):
tz = datetime.now().astimezone().strftime('%z')
tz = "{}:{}".format(tz[:-2], tz[-2:])
raiz = etree.Element(tag_raiz, versao=VERSAO_MDFE, xmlns=NAMESPACE_MDFE)
e = etree.SubElement(raiz, 'infEvento', Id=evento.identificador)
etree.SubElement(e, 'cOrgao').text = CODIGOS_ESTADOS[evento.uf.upper()]
etree.SubElement(e, 'tpAmb').text = str(self._ambiente)
if len(so_numeros(evento.cnpj)) == 11:
etree.SubElement(e, 'CPF').text = evento.cnpj
else:
etree.SubElement(e, 'CNPJ').text = evento.cnpj
etree.SubElement(e, 'chMDFe').text = evento.chave
etree.SubElement(e, 'dhEvento').text = evento.data_emissao.strftime('%Y-%m-%dT%H:%M:%S') + tz
etree.SubElement(e, 'tpEvento').text = evento.tp_evento
etree.SubElement(e, 'nSeqEvento').text = str(evento.n_seq_evento)
det = etree.SubElement(e, 'detEvento', versaoEvento=VERSAO_MDFE)
if evento.descricao == 'Cancelamento':
cancelamento = etree.SubElement(det, 'evCancMDFe')
etree.SubElement(cancelamento, 'descEvento').text = evento.descricao
etree.SubElement(cancelamento, 'nProt').text = evento.protocolo
etree.SubElement(cancelamento, 'xJust').text = evento.justificativa
if evento.descricao == 'Encerramento':
encerramento = etree.SubElement(det, 'evEncMDFe')
etree.SubElement(encerramento, 'descEvento').text = evento.descricao
etree.SubElement(encerramento, 'nProt').text = evento.protocolo
etree.SubElement(encerramento, 'dtEnc').text = evento.dtenc.strftime('%Y-%m-%d')
etree.SubElement(encerramento, 'cUF').text = str(evento.cuf)
etree.SubElement(encerramento, 'cMun').text = str(evento.cmun)
elif evento.descricao == 'Inclusão Condutor':
inclusao = etree.SubElement(det, 'evIncCondutorMDFe')
etree.SubElement(inclusao, 'descEvento').text = evento.descricao
condutor = etree.SubElement(inclusao, 'condutor')
etree.SubElement(condutor, 'xNome').text = evento.nome_motorista
etree.SubElement(condutor, 'CPF').text = evento.cpf_motorista
elif evento.descricao == 'Inclusao DF-e':
inclusao = etree.SubElement(det, 'evIncDFeMDFe')
etree.SubElement(inclusao, 'descEvento').text = evento.descricao
etree.SubElement(inclusao, 'nProt').text = evento.protocolo
etree.SubElement(inclusao, 'cMunCarrega').text = str(evento.cmun_carrega)
etree.SubElement(inclusao, 'xMunCarrega').text = evento.xmun_carrega
infDoc = etree.SubElement(inclusao, 'infDoc')
etree.SubElement(infDoc, 'cMunDescarga').text = str(evento.cmun_descarga)
etree.SubElement(infDoc, 'xMunDescarga').text = evento.xmun_descarga
etree.SubElement(infDoc, 'chNFe').text = evento.chave_nfe
elif evento.descricao == 'Pagamento Operacao MDF-e':
pagamento = etree.SubElement(det, 'evPagtoOperMDFe')
etree.SubElement(pagamento, 'descEvento').text = evento.descricao
etree.SubElement(pagamento, 'nProt').text = evento.protocolo
# Viagens
infViagens = etree.SubElement(pagamento, 'infViagens')
etree.SubElement(infViagens, 'qtdViagens').text = evento.qtd_viagens.zfill(5)
etree.SubElement(infViagens, 'nroViagem').text = evento.nro_viagens.zfill(5)
# Informações do pagamento
infPag = etree.SubElement(pagamento, 'infPag')
etree.SubElement(infPag, 'xNome').text = evento.nome_contratante
if len(evento.cpfcnpj_contratante) == 11:
etree.SubElement(infPag, 'CPF').text = evento.cpfcnpj_contratante
else:
etree.SubElement(infPag, 'CNPJ').text = evento.cpfcnpj_contratante
# Componentes de Pagamento do Frete
Comp = etree.SubElement(infPag, 'Comp')
etree.SubElement(Comp, 'tpComp').text = evento.tpComp.zfill(2)
etree.SubElement(Comp, 'vComp').text = '{:.2f}'.format(evento.vComp)
# Continuação das Informações do pagamento
etree.SubElement(infPag, 'vContrato').text = '{:.2f}'.format(evento.vContrato)
etree.SubElement(infPag, 'indPag').text = evento.indPag
# Se indPag == 1 (0=A vista e 1=A prazo)
if evento.indPag != '':
if int(evento.indPag) == 1:
infPrazo = etree.SubElement(infPag, 'infPrazo')
etree.SubElement(infPrazo, 'nParcela').text = evento.nParcela.zfill(3)
etree.SubElement(infPrazo, 'dVenc').text = evento.dVenc.strftime('%Y-%m-%d')
etree.SubElement(infPrazo, 'vParcela').text = '{:.2f}'.format(evento.vParcela)
# Informações bancárias
infBanc = etree.SubElement(infPag, 'infBanc')
if evento.CNPJIPEF != '':
etree.SubElement(infBanc, 'CNPJIPEF').text = evento.CNPJIPEF.zfill(14)
else:
etree.SubElement(infBanc, 'codBanco').text = evento.codBanco
etree.SubElement(infBanc, 'codAgencia').text = evento.codAgencia
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
class SerializacaoQrcode(object):
""" Classe que gera e serializa o qrcode de NFC-e no xml """
@ -903,6 +1010,38 @@ class SerializacaoQrcode(object):
return nfe
class SerializacaoQrcodeMDFe(object):
""" Classe que gera e serializa o qrcode do MDF-e no xml """
def gerar_qrcode(self, xml, return_qr=False):
# Procura atributos no xml
ns = {'ns': NAMESPACE_MDFE}
# Tag Raiz MDFe Ex: <MDFe>
mdfe = xml
chave = mdfe[0].attrib['Id'].replace('MDFe', '')
tpamb = mdfe.xpath('ns:infMDFe/ns:ide/ns:tpAmb/text()', namespaces=ns)[0]
url_padrao = MDFE['SVRS']['QRCODE']
qrcode = f'{url_padrao}?chMDFe={chave}&tpAmb={tpamb}'
# adiciona tag infMDFeSupl com qrcode
infMDFeSupl = etree.Element('infMDFeSupl')
etree.SubElement(infMDFeSupl, 'qrCodMDFe').text = f'<![CDATA[{qrcode.strip()}]]>'
mdfe.insert(1, infMDFeSupl)
# correção da tag qrCode, retira caracteres pois e CDATA
tmdfe = etree.tostring(mdfe, encoding='unicode')
etree.tostring(mdfe.find('.//qrCodMDFe'), encoding='unicode') \
.replace('\n', '').replace('&lt;', '<').replace('&gt;', '>').replace('amp;', '')
mdfe = etree.fromstring(tmdfe)
if return_qr:
return mdfe, qrcode.strip()
else:
return mdfe
class SerializacaoNfse(object):
def __init__(self, autorizador):
"Recebe uma string com o nome do autorizador."
@ -962,3 +1101,422 @@ class SerializacaoNfse(object):
return SerializacaoBetha().cancelar(nfse)
else:
raise Exception('Autorizador não suportado para cancelamento!')
class SerializacaoMDFe(Serializacao):
""" Classe de serialização do arquivo xml """
_versao = VERSAO_MDFE
def exportar(self, destino=None, retorna_string=False, limpar=True, **kwargs):
"""Gera o(s) arquivo(s) do Manifesto de Documento Fiscais Eletrônicos no padrao oficial
da SEFAZ e Receita Federal, para ser(em) enviado(s) para o webservice ou para ser(em)
armazenado(s) em cache local.
@param destino -
@param retorna_string - Retorna uma string para debug.
@param limpar - Limpa a fonte de dados para não gerar xml com dados duplicados.
"""
try:
# No raiz do XML de saida
raiz = etree.Element('MDFe', xmlns=NAMESPACE_MDFE)
# Carrega lista de Manifestos
manifestos = self._fonte_dados.obter_lista(_classe=Manifesto, **kwargs)
for mdfe in manifestos:
raiz.append(self._serializar_manifesto(mdfe, retorna_string=False))
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=False)
else:
return raiz
except Exception as e:
raise e
finally:
if limpar:
self._fonte_dados.limpar_dados()
def importar(self, origem):
"""Cria as instancias do PyNFe a partir de arquivos XML no formato padrao da
SEFAZ e Receita Federal."""
raise Exception('Metodo nao implementado')
def _serializar_emitente(self, emitente, tag_raiz='emit', retorna_string=True):
raiz = etree.Element(tag_raiz)
# Dados do emitente
if len(so_numeros(emitente.cpfcnpj)) == 11:
etree.SubElement(raiz, 'CPF').text = so_numeros(emitente.cpfcnpj)
else:
etree.SubElement(raiz, 'CNPJ').text = so_numeros(emitente.cpfcnpj)
etree.SubElement(raiz, 'IE').text = emitente.inscricao_estadual
etree.SubElement(raiz, 'xNome').text = emitente.razao_social
etree.SubElement(raiz, 'xFant').text = emitente.nome_fantasia
# Endereço
endereco = etree.SubElement(raiz, 'enderEmit')
etree.SubElement(endereco, 'xLgr').text = emitente.endereco_logradouro
etree.SubElement(endereco, 'nro').text = emitente.endereco_numero
if emitente.endereco_complemento:
etree.SubElement(endereco, 'xCpl').text = emitente.endereco_complemento
etree.SubElement(endereco, 'xBairro').text = emitente.endereco_bairro
etree.SubElement(endereco, 'cMun').text = obter_codigo_por_municipio(
emitente.endereco_municipio, emitente.endereco_uf)
etree.SubElement(endereco, 'xMun').text = emitente.endereco_municipio
etree.SubElement(endereco, 'CEP').text = so_numeros(emitente.endereco_cep)
etree.SubElement(endereco, 'UF').text = emitente.endereco_uf
if emitente.endereco_telefone:
etree.SubElement(endereco, 'fone').text = emitente.endereco_telefone
etree.SubElement(endereco, 'email').text = emitente.endereco_email
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_municipio_carrega(self, municipio_carrega, tag_raiz='infMunCarrega', retorna_string=True):
raiz = etree.Element(tag_raiz)
etree.SubElement(raiz, 'cMunCarrega').text = str(municipio_carrega.cMunCarrega)
etree.SubElement(raiz, 'xMunCarrega').text = str(municipio_carrega.xMunCarrega)
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_percurso(self, percurso, tag_raiz='infPercurso', retorna_string=True):
raiz = etree.Element(tag_raiz)
etree.SubElement(raiz, 'UFPer').text = percurso.UFPer
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_modal_rodoviario(self, modal_rodoviario, tag_raiz='infModal', retorna_string=True):
"""
<infModal versaoModal="3.00">
rodo
infANTT
infCIOT
valePed
infContratante
infPag
veicTracao
prop
condutor
veicReboque
prop
"""
raiz = etree.Element(tag_raiz, versaoModal=self._versao)
rodo = etree.SubElement(raiz, 'rodo')
infANTT = etree.SubElement(rodo, 'infANTT')
etree.SubElement(infANTT, 'RNTRC').text = modal_rodoviario.rntrc.zfill(8)
# CIOT
if modal_rodoviario.ciot != None:
for num, item in enumerate(modal_rodoviario.ciot):
infCIOT = etree.SubElement(infANTT, 'infCIOT')
etree.SubElement(infCIOT, 'CIOT').text = item.numero_ciot
if len(item.cpfcnpj) == 11:
etree.SubElement(infCIOT, 'CPF').text = item.cpfcnpj
elif len(item.cpfcnpj) == 14:
etree.SubElement(infCIOT, 'CNPJ').text = item.cpfcnpj
# Vale Pedágio
if modal_rodoviario.pedagio != None:
valePed = etree.SubElement(infANTT, 'valePed')
for num, item in enumerate(modal_rodoviario.pedagio):
disp = etree.SubElement(valePed, 'disp')
etree.SubElement(disp, 'CNPJForn').text = item.cnpj_fornecedor
if len(item.cpfcnpj_pagador) == 11:
etree.SubElement(disp, 'CPFPg').text = item.cpfcnpj_pagador
elif len(item.cpfcnpj_pagador) == 14:
etree.SubElement(disp, 'CNPJPg').text = item.cpfcnpj_pagador
etree.SubElement(disp, 'nCompra').text = item.numero_compra
etree.SubElement(disp, 'vValePed').text = '{:.2f}'.format(item.valor_pedagio or 0) # Valor do ICMS
# Contratantes
if modal_rodoviario.contratante != None:
for num, item in enumerate(modal_rodoviario.contratante):
infContratante = etree.SubElement(infANTT, 'infContratante')
etree.SubElement(infContratante, 'xNome').text = item.nome
if len(item.cpfcnpj) == 11:
etree.SubElement(infContratante, 'CPF').text = item.cpfcnpj
elif len(item.cpfcnpj) == 14:
etree.SubElement(infContratante, 'CNPJ').text = item.cpfcnpj
# Veículo Tração
if (len(modal_rodoviario.veiculo_tracao) != 1):
raise f'Permitido somente um único veículo Tração'
for num, item in enumerate(modal_rodoviario.veiculo_tracao):
veicTracao = etree.SubElement(rodo, 'veicTracao')
etree.SubElement(veicTracao, 'cInt').text = item.cInt
etree.SubElement(veicTracao, 'placa').text = item.placa
if item.RENAVAM:
etree.SubElement(veicTracao, 'RENAVAM').text = item.RENAVAM
etree.SubElement(veicTracao, 'tara').text = '{:.0f}'.format(item.tara or 0)
etree.SubElement(veicTracao, 'capKG').text = '{:.0f}'.format(item.capKG or 0)
etree.SubElement(veicTracao, 'capM3').text = '{:.0f}'.format(item.capM3 or 0)
# Propritario do veículo Tração
if item.proprietario:
prop = etree.SubElement(veicTracao, 'prop')
if len(item.proprietario.cpfcnpj) == 11:
etree.SubElement(prop, 'CPF').text = item.proprietario.cpfcnpj
elif len(item.proprietario.cpfcnpj) == 14:
etree.SubElement(prop, 'CNPJ').text = item.proprietario.cpfcnpj
etree.SubElement(prop, 'RNTRC').text = item.proprietario.rntrc.zfill(8)
etree.SubElement(prop, 'xNome').text = item.proprietario.nome
if item.proprietario.inscricao_estudual != None:
etree.SubElement(prop, 'IE').text = item.proprietario.inscricao_estudual
etree.SubElement(prop, 'UF').text = item.proprietario.uf
# tpProp: 0=TACAgregado; 1=TACIndependente; 2=Outros
etree.SubElement(prop, 'tpProp').text = item.proprietario.tipo
# condutor 1-n
if item.condutor != None:
for num, item_condutor in enumerate(item.condutor):
condutor = etree.SubElement(veicTracao, 'condutor')
etree.SubElement(condutor, 'xNome').text = item_condutor.nome_motorista
etree.SubElement(condutor, 'CPF').text = item_condutor.cpf_motorista
# fim-condutor
etree.SubElement(veicTracao, 'tpRod').text = item.tpRod
etree.SubElement(veicTracao, 'tpCar').text = item.tpCar
etree.SubElement(veicTracao, 'UF').text = item.UF
# fim-veicTracao
# Veículos reboque 1-n
if modal_rodoviario.veiculo_reboque != None:
for num, item_reboque in enumerate(modal_rodoviario.veiculo_reboque):
veicReboque = etree.SubElement(rodo, 'veicReboque')
etree.SubElement(veicReboque, 'cInt').text = item_reboque.cInt
etree.SubElement(veicReboque, 'placa').text = item_reboque.placa
if item_reboque.RENAVAM:
etree.SubElement(veicReboque, 'RENAVAM').text = item_reboque.RENAVAM
etree.SubElement(veicReboque, 'tara').text = '{:.0f}'.format(item_reboque.tara or 0)
etree.SubElement(veicReboque, 'capKG').text = '{:.0f}'.format(item_reboque.capKG or 0)
etree.SubElement(veicReboque, 'capM3').text = '{:.0f}'.format(item_reboque.capM3 or 0)
# Propritario do veículo Reboque
if item_reboque.proprietario:
prop = etree.SubElement(veicReboque, 'prop')
if len(item_reboque.proprietario.cpfcnpj) == 11:
etree.SubElement(prop, 'CPF').text = item_reboque.proprietario.cpfcnpj
elif len(item_reboque.proprietario.cpfcnpj) == 14:
etree.SubElement(prop, 'CNPJ').text = item_reboque.proprietario.cpfcnpj
etree.SubElement(prop, 'RNTRC').text = item_reboque.proprietario.rntrc.zfill(8)
etree.SubElement(prop, 'xNome').text = item_reboque.proprietario.nome
if item_reboque.proprietario.inscricao_estudual != None:
etree.SubElement(prop, 'IE').text = item_reboque.proprietario.inscricao_estudual
etree.SubElement(prop, 'UF').text = item_reboque.proprietario.uf
# tpProp: 0=TACAgregado; 1=TACIndependente; 2=Outros
etree.SubElement(prop, 'tpProp').text = item_reboque.proprietario.tipo
etree.SubElement(veicReboque, 'tpCar').text = item_reboque.tpCar
etree.SubElement(veicReboque, 'UF').text = item_reboque.UF
# fim-veicReboque
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_documentos(self, documentos, tag_raiz='infDoc', retorna_string=True):
raiz = etree.Element(tag_raiz)
if len(documentos) <= 0:
raise f'MDFe deve ter uma NFe ou uma CTe vinculadas'
for num, item in enumerate(documentos):
infMunDescarga = etree.SubElement(raiz, 'infMunDescarga')
etree.SubElement(infMunDescarga, 'cMunDescarga').text = item.cMunDescarga
etree.SubElement(infMunDescarga, 'xMunDescarga').text = item.xMunDescarga
if len(item.documentos_nfe) > 0:
for num, item_doc in enumerate(item.documentos_nfe):
infNFe = etree.SubElement(infMunDescarga, 'infNFe')
etree.SubElement(infNFe, 'chNFe').text = item_doc.chave_acesso_nfe
elif len(documentos.documentos_cte) > 0:
for num, item_doc in enumerate(item.documentos_cte):
infCTe = etree.SubElement(infMunDescarga, 'infCTe')
etree.SubElement(infCTe, 'chCTe').text = item_doc.chave_acesso_cte
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_seguradora(self, seguradora, tag_raiz='seg', retorna_string=True):
raiz = etree.Element(tag_raiz)
infResp = etree.SubElement(raiz, 'infResp')
etree.SubElement(infResp, 'respSeg').text = seguradora.responsavel_seguro
etree.SubElement(infResp, 'CNPJ').text = seguradora.cnpj_responsavel
infSeg = etree.SubElement(raiz, 'infSeg')
etree.SubElement(infSeg, 'xSeg').text = seguradora.nome_seguradora
etree.SubElement(infSeg, 'CNPJ').text = seguradora.cnpj_seguradora
etree.SubElement(raiz, 'nApol').text = seguradora.numero_apolice
if len(seguradora.averbacoes) > 0:
for num, item in enumerate(seguradora.averbacoes):
etree.SubElement(raiz, 'nAver').text = item.numero
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_produto(self, produto, tag_raiz='prodPred', retorna_string=True):
raiz = etree.Element(tag_raiz)
etree.SubElement(raiz, 'tpCarga').text = produto.tipo_carga
etree.SubElement(raiz, 'xProd').text = produto.nome_produto
etree.SubElement(raiz, 'cEAN').text = produto.cean
etree.SubElement(raiz, 'NCM').text = produto.ncm
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_totais(self, totais, tag_raiz='tot', retorna_string=True):
raiz = etree.Element(tag_raiz)
if totais.qCTe > 0:
etree.SubElement(raiz, 'qCTe').text = str(totais.qCTe)
elif totais.qNFe > 0:
etree.SubElement(raiz, 'qNFe').text = str(totais.qNFe)
etree.SubElement(raiz, 'vCarga').text = str(totais.vCarga)
if totais.cUnid == 'KG':
etree.SubElement(raiz, 'cUnid').text = '01'
elif totais.cUnid == 'TON':
etree.SubElement(raiz, 'cUnid').text = '02'
else:
raise 'cUnid deve ser KG ou TON'
etree.SubElement(raiz, 'qCarga').text = str('{:.4f}').format(totais.qCarga or 0)
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_lacres(self, lacres, tag_raiz='lacres', retorna_string=True):
raiz = etree.Element(tag_raiz)
etree.SubElement(raiz, 'nLacre').text = str(lacres.nLacre)
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_responsavel_tecnico(self, responsavel_tecnico, tag_raiz='infRespTec', retorna_string=True):
raiz = etree.Element(tag_raiz)
etree.SubElement(raiz, 'CNPJ').text = responsavel_tecnico.cnpj
etree.SubElement(raiz, 'xContato').text = responsavel_tecnico.contato
etree.SubElement(raiz, 'email').text = responsavel_tecnico.email
etree.SubElement(raiz, 'fone').text = responsavel_tecnico.fone
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_manifesto(self, manifesto, tag_raiz='infMDFe', retorna_string=True):
raiz = etree.Element(tag_raiz, versao=self._versao)
# 'Id' da tag raiz
# Ex.: MDFe35080599999090910270550010000000011518005123
raiz.attrib['Id'] = manifesto.identificador_unico
tz = datetime.now().astimezone().strftime('%z')
tz = "{}:{}".format(tz[:-2], tz[-2:])
# Dados do Manifesto
ide = etree.SubElement(raiz, 'ide')
etree.SubElement(ide, 'cUF').text = CODIGOS_ESTADOS[manifesto.uf]
etree.SubElement(ide, 'tpAmb').text = str(self._ambiente)
etree.SubElement(ide, 'tpEmit').text = str(manifesto.tipo_emitente)
# 0=nenhum; 1=etc; 2=tac; 3=ctc
if manifesto.tipo_transportador != 0:
etree.SubElement(ide, 'tpTransp').text = str(manifesto.tipo_transportador)
etree.SubElement(ide, 'mod').text = str(manifesto.modelo)
etree.SubElement(ide, 'serie').text = manifesto.serie
etree.SubElement(ide, 'nMDF').text = str(manifesto.numero_mdfe)
etree.SubElement(ide, 'cMDF').text = manifesto.codigo_numerico_aleatorio
etree.SubElement(ide, 'cDV').text = manifesto.dv_codigo_numerico_aleatorio
etree.SubElement(ide, 'modal').text = str(manifesto.modal)
etree.SubElement(ide, 'dhEmi').text = manifesto.data_emissao.strftime('%Y-%m-%dT%H:%M:%S') + tz
etree.SubElement(ide, 'tpEmis').text = str(manifesto.forma_emissao)
etree.SubElement(ide, 'procEmi').text = str(manifesto.processo_emissao)
etree.SubElement(ide, 'verProc').text = f'{self._nome_aplicacao} {manifesto.versao_processo_emissao}'
etree.SubElement(ide, 'UFIni').text = manifesto.UFIni
etree.SubElement(ide, 'UFFim').text = manifesto.UFFim
# Municipios de Carregamento
for num, item in enumerate(manifesto.infMunCarrega):
ide.append(self._serializar_municipio_carrega(item, retorna_string=False))
# UFs Percurso
for num, item in enumerate(manifesto.infPercurso):
ide.append(self._serializar_percurso(item, retorna_string=False))
if manifesto.dhIniViagem != None:
etree.SubElement(ide, 'dhIniViagem').text = manifesto.dhIniViagem.strftime('%Y-%m-%dT%H:%M:%S') + tz
# - fim ide
# Emitente
raiz.append(self._serializar_emitente(manifesto.emitente, retorna_string=False))
# infModal rodo
raiz.append(self._serializar_modal_rodoviario(manifesto.modal_rodoviario, retorna_string=False))
# infDoc infCTe ou infNFe
raiz.append(self._serializar_documentos(manifesto.documentos, retorna_string=False))
# seg
if len(manifesto.seguradora) > 0:
for num, item in enumerate(manifesto.seguradora):
raiz.append(self._serializar_seguradora(item, retorna_string=False))
# prodPred
if len(manifesto.produto) > 0:
raiz.append(self._serializar_produto(manifesto.produto[0], retorna_string=False))
# totais
raiz.append(self._serializar_totais(manifesto.totais, retorna_string=False))
# lacres
if len(manifesto.lacres) > 0:
for num, item in enumerate(manifesto.lacres):
raiz.append(self._serializar_lacres(item, retorna_string=False))
# Informações adicionais
if manifesto.informacoes_adicionais_interesse_fisco or manifesto.informacoes_complementares_interesse_contribuinte:
info_ad = etree.SubElement(raiz, 'infAdic')
if manifesto.informacoes_adicionais_interesse_fisco:
etree.SubElement(info_ad, 'infAdFisco').text = manifesto.informacoes_adicionais_interesse_fisco
if manifesto.informacoes_complementares_interesse_contribuinte:
etree.SubElement(info_ad, 'infCpl').text = manifesto.informacoes_complementares_interesse_contribuinte
# Responsavel Tecnico NT2018/003
if manifesto.responsavel_tecnico:
raiz.append(self._serializar_responsavel_tecnico(
manifesto.responsavel_tecnico[0], retorna_string=False))
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz

14
pynfe/utils/__init__.py

@ -3,13 +3,17 @@
import os
import codecs
from unicodedata import normalize
import re
try:
from lxml import etree
except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree')
from io import StringIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
try:
from . import flags
@ -146,3 +150,11 @@ def obter_uf_por_codigo(codigo_uf):
def remover_acentos(txt):
return normalize('NFKD', txt).encode('ASCII','ignore').decode('ASCII')
def extrai_id_srtxml(edoc):
result = ''
match = re.search('Id=[^0-9]+(\d+)"', edoc)
if match:
result = match.group(1)
return result

16
pynfe/utils/flags.py

@ -10,6 +10,11 @@ NAMESPACE_METODO = 'http://www.portalfiscal.inf.br/nfe/wsdl/'
NAMESPACE_SOAP_NFSE = 'http://schemas.xmlsoap.org/soap/envelope/'
NAMESPACE_BETHA = 'http://www.betha.com.br/e-nota-contribuinte-ws'
NAMESPACE_MDFE = 'http://www.portalfiscal.inf.br/mdfe'
NAMESPACE_MDFE_METODO = 'http://www.portalfiscal.inf.br/mdfe/wsdl/'
MODELO_MDFE = '58'
VERSAO_MDFE = '3.00'
VERSAO_PADRAO = '4.00'
VERSAO_QRCODE = '2'
@ -69,6 +74,17 @@ NF_STATUS = (
'Cancelada',
)
MDFE_STATUS = (
'Em Digitacao',
'Validada',
'Assinada',
'Em processamento',
'Autorizada',
'Rejeitada',
'Cancelada',
'Encerrada',
)
NF_TIPOS_DOCUMENTO = (
(0, 'Entrada'),
(1, 'Saida'),

17
pynfe/utils/webservices.py

@ -504,3 +504,20 @@ NFSE = {
'HOMOLOGACAO':'https://homologacao.ginfes.com.br/ServiceGinfesImpl?wsdl'
}
}
# MDF-e
MDFE = {
# unico autorizador de MDF-e
'SVRS': {
'RECEPCAO': 'MDFeRecepcao/MDFeRecepcao.asmx',
'RECEPCAO_SINC': 'MDFeRecepcaoSinc/MDFeRecepcaoSinc.asmx',
'RET_RECEPCAO': 'mdferetrecepcao/MDFeRetRecepcao.asmx',
'EVENTOS': 'mdferecepcaoevento/MDFeRecepcaoEvento.asmx',
'CONSULTA': 'mdfeconsulta/MDFeConsulta.asmx',
'STATUS': 'mdfestatusservico/MDFeStatusServico.asmx',
'NAO_ENCERRADOS': 'mdfeconsnaoenc/MDFeConsNaoEnc.asmx',
'HTTPS': 'https://mdfe.svrs.rs.gov.br/ws/',
'HOMOLOGACAO': 'https://mdfe-homologacao.svrs.rs.gov.br/ws/',
'QRCODE': 'https://dfe-portal.svrs.rs.gov.br/mdfe/qrCode'
}
}
Loading…
Cancel
Save