Browse Source

correção de conflito

pull/7/head
Leonardo Tada 10 years ago
parent
commit
fd9122c390
  1. 55
      pynfe/processamento/assinatura.py
  2. 202
      pynfe/processamento/autorizador_nfse.py
  3. 106
      pynfe/processamento/comunicacao.py
  4. 9
      pynfe/processamento/serializacao.py

55
pynfe/processamento/assinatura.py

@ -64,7 +64,7 @@ class AssinaturaA1(Assinatura):
except Exception as e:
raise e
def assinarNfse(self, xml, xpath='/GerarNfseEnvio/ns1:Rps/ns1:InfDeclaracaoPrestacaoServico', retorna_string=False):
def assinarNfse(self, xml, xpath='/GerarNfseEnvio/ns1:Rps', retorna_string=False):
try:
xml = etree.fromstring(xml)
# No raiz do XML de saida
@ -76,7 +76,7 @@ class AssinaturaA1(Assinatura):
# Tenta achar a tag infNFe
# TODO a proxima linha nao eh encontrada pq precisa colocar o namespace, GerarNfseEnvio.
ref = etree.SubElement(siginfo, 'Reference', URI='#' +
xml.xpath(xpath, namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0].attrib['Id'])
xml.xpath(xpath + '/ns1:InfDeclaracaoPrestacaoServico', namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0].attrib['Id'])
trans = etree.SubElement(ref, 'Transforms')
etree.SubElement(trans, 'Transform', Algorithm='http://www.w3.org/2000/09/xmldsig#enveloped-signature')
@ -87,7 +87,7 @@ class AssinaturaA1(Assinatura):
keyinfo = etree.SubElement(raiz, 'KeyInfo')
etree.SubElement(keyinfo, 'X509Data')
rps = xml.xpath('ns1:Rps', namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0]
rps = xml.xpath(xpath, namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0]
rps.append(raiz)
# Escreve no arquivo depois de remover caracteres especiais e parse string
@ -107,7 +107,7 @@ class AssinaturaA1(Assinatura):
def assinarLoteNfse(self, lote, retorna_string=False):
try:
# Assina a nota
lote = self.assinarNfse(lote, xpath='', retorna_string=True)
lote = self.assinarNfse(xml=lote, xpath='/EnviarLoteRpsSincronoEnvio/ns1:LoteRps/ns1:ListaRps/ns1:Rps', retorna_string=True)
# Assina o lote
lote = etree.fromstring(lote)
# No raiz do XML de saida
@ -117,10 +117,7 @@ class AssinaturaA1(Assinatura):
etree.SubElement(siginfo, 'CanonicalizationMethod', Algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
etree.SubElement(siginfo, 'SignatureMethod', Algorithm='http://www.w3.org/2000/09/xmldsig#rsa-sha1')
# Tenta achar a tag LoteRps
ref = etree.SubElement(siginfo, 'Reference', URI='#' +
lote.xpath('/EnviarLoteRpsSincronoEnvio/ns1:LoteRps',
namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0].attrib['Id'])
ref = etree.SubElement(siginfo, 'Reference', URI='#' + lote[0].attrib['Id'])
trans = etree.SubElement(ref, 'Transforms')
etree.SubElement(trans, 'Transform', Algorithm='http://www.w3.org/2000/09/xmldsig#enveloped-signature')
etree.SubElement(trans, 'Transform', Algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
@ -136,11 +133,51 @@ class AssinaturaA1(Assinatura):
with open('nfse.xml', 'w') as arquivo:
arquivo.write(remover_acentos(etree.tostring(lote, encoding="unicode", pretty_print=False).replace('ns1:', '').replace(':ns1', '')))
subprocess.call(['xmlsec1', '--sign', '--pkcs12', self.certificado, '--pwd', self.senha, '--crypto', 'openssl', '--output', 'lote.xml', '--id-attr:Id', tag, 'nfse.xml'])
xml = etree.parse('lote.xml').getroot()
if retorna_string:
return etree.tostring(xml, encoding="unicode", pretty_print=False)
else:
return xml
except Exception as e:
raise e
def assinarCancelar(self, xml, retorna_string=False):
try:
xml = etree.fromstring(xml)
# No raiz do XML de saida
tag = 'InfPedidoCancelamento' # tag que será assinada
raiz = etree.Element('Signature', xmlns='http://www.w3.org/2000/09/xmldsig#')
siginfo = etree.SubElement(raiz, 'SignedInfo')
etree.SubElement(siginfo, 'CanonicalizationMethod', Algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
etree.SubElement(siginfo, 'SignatureMethod', Algorithm='http://www.w3.org/2000/09/xmldsig#rsa-sha1')
# Tenta achar a tag infNFe
# TODO a proxima linha nao eh encontrada pq precisa colocar o namespace, GerarNfseEnvio.
ref = etree.SubElement(siginfo, 'Reference', URI='#' +
xml.xpath('/CancelarNfseEnvio/ns1:Pedido/ns1:InfPedidoCancelamento', namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0].attrib['Id'])
trans = etree.SubElement(ref, 'Transforms')
etree.SubElement(trans, 'Transform', Algorithm='http://www.w3.org/2000/09/xmldsig#enveloped-signature')
etree.SubElement(trans, 'Transform', Algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
etree.SubElement(ref, 'DigestMethod', Algorithm='http://www.w3.org/2000/09/xmldsig#sha1')
etree.SubElement(ref, 'DigestValue')
etree.SubElement(raiz, 'SignatureValue')
keyinfo = etree.SubElement(raiz, 'KeyInfo')
etree.SubElement(keyinfo, 'X509Data')
rps = xml.xpath('/CancelarNfseEnvio/ns1:Pedido', namespaces={'ns1': 'http://www.betha.com.br/e-nota-contribuinte-ws'})[0]
rps.append(raiz)
# Escreve no arquivo depois de remover caracteres especiais e parse string
with open('nfse.xml', 'w') as arquivo:
arquivo.write(remover_acentos(etree.tostring(xml, encoding="unicode", pretty_print=False).replace('ns1:', '').replace(':ns1', '')))
subprocess.call(['xmlsec1', '--sign', '--pkcs12', self.certificado, '--pwd', self.senha, '--crypto', 'openssl', '--output', 'funfa.xml', '--id-attr:Id', tag, 'nfse.xml'])
xml = etree.parse('funfa.xml').getroot()
if retorna_string:
return etree.tostring(lote, encoding="unicode", pretty_print=False)
return etree.tostring(xml, encoding="unicode", pretty_print=False)
else:
return xml
except Exception as e:

202
pynfe/processamento/autorizador_nfse.py

@ -5,16 +5,27 @@ except:
pass # modulo necessario apenas para NFS-e.
class SerializacaoAutorizador():
pass
class InterfaceAutorizador():
#TODO Colocar raise Exception Not Implemented nos metodos
def consultar(self):
pass
def consultar_faixa(self):
pass
class SerializacaoBetha(SerializacaoAutorizador):
def cancelar(self):
pass
def serializar_lote_sincrono(self):
pass
class SerializacaoBetha(InterfaceAutorizador):
def __init__(self):
if 'nfse_schema' not in globals():
raise ImportError('No module named nfse_v202 or PyXB')
def serializar_gerar(self, nfse):
def gerar(self, nfse):
"""Retorna string de um XML gerado a partir do
XML Schema (XSD). Binding gerado pelo modulo PyXB."""
@ -83,7 +94,78 @@ class SerializacaoBetha(SerializacaoAutorizador):
return gnfse.toxml(element_name='GerarNfseEnvio')
def _serializar_lote_sincrono(self, nfse):
def consultar(self, nfse):
"""Retorna string de um XML gerado a partir do
XML Schema (XSD). Binding gerado pelo modulo PyXB."""
# Rps
id_rps = nfse_schema.tcIdentificacaoRps()
id_rps.Numero = nfse.identificador
id_rps.Serie = nfse.serie
id_rps.Tipo = nfse.tipo
# Prestador
id_prestador = nfse_schema.tcIdentificacaoPrestador()
id_prestador.CpfCnpj = nfse.emitente.cnpj
id_prestador.InscricaoMunicipal = nfse.emitente.inscricao_municipal
consulta = nfse_schema.ConsultarNfseRpsEnvio()
consulta.IdentificacaoRps = id_rps
consulta.Prestador = id_prestador
consulta = consulta.toxml(element_name='ConsultarNfseRpsEnvio').replace('ns1:','').replace(':ns1','').replace('<?xml version="1.0" ?>','')
return consulta
def consultar_faixa(self, emitente, inicio, fim, pagina):
"""Retorna string de um XML gerado a partir do
XML Schema (XSD). Binding gerado pelo modulo PyXB."""
# Prestador
id_prestador = nfse_schema.tcIdentificacaoPrestador()
id_prestador.CpfCnpj = emitente.cnpj
id_prestador.InscricaoMunicipal = emitente.inscricao_municipal
consulta = nfse_schema.ConsultarNfseFaixaEnvio()
consulta.Prestador = id_prestador
consulta.Pagina = pagina
# É necessário BIND antes de atribuir numero final e numero inicial
consulta.Faixa = BIND()
consulta.Faixa.NumeroNfseInicial = inicio
consulta.Faixa.NumeroNfseFinal = fim
consulta = consulta.toxml(element_name='ConsultarNfseFaixaEnvio').replace('ns1:','').replace(':ns1','').replace('<?xml version="1.0" ?>','')
return consulta
def cancelar(self, nfse):
"""Retorna string de um XML gerado a partir do
XML Schema (XSD). Binding gerado pelo modulo PyXB."""
# id nfse
id_nfse = nfse_schema.tcIdentificacaoNfse()
id_nfse.Numero = nfse.identificador
id_nfse.CpfCnpj = nfse.emitente.cnpj
id_nfse.InscricaoMunicipal = nfse.emitente.inscricao_municipal
id_nfse.CodigoMunicipio = nfse.emitente.endereco_cod_municipio
# Info Pedido de cancelamento
info_pedido = nfse_schema.tcInfPedidoCancelamento()
info_pedido.Id = '1'
info_pedido.IdentificacaoNfse = id_nfse
#pedido.CodigoCancelamento =
# Pedido
pedido = nfse_schema.tcPedidoCancelamento()
pedido.InfPedidoCancelamento = info_pedido
# Cancelamento
cancelar = nfse_schema.CancelarNfseEnvio()
cancelar.Pedido = pedido
return cancelar.toxml(element_name='CancelarNfseEnvio')
def serializar_lote_sincrono(self, nfse):
"""Retorna string de um XML gerado a partir do
XML Schema (XSD). Binding gerado pelo modulo PyXB."""
@ -161,113 +243,3 @@ class SerializacaoBetha(SerializacaoAutorizador):
gnfse.LoteRps = lote
return gnfse.toxml(element_name='EnviarLoteRpsSincronoEnvio')
def _serializar_emitente(self, emitente, tag_raiz='Prestador', retorna_string=False):
raiz = etree.Element(tag_raiz)
documento = etree.SubElement(raiz, 'CpfCnpj')
etree.SubElement(documento, 'Cnpj').text = emitente.cnpj
etree.SubElement(raiz, 'InscricaoMunicipal').text = emitente.inscricao_municipal
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_cliente(self, cliente, tag_raiz='Tomador', retorna_string=False):
raiz = etree.Element(tag_raiz)
identificacao = etree.SubElement(raiz, 'IdentificacaoTomador')
documento = etree.SubElement(identificacao, 'CpfCnpj')
etree.SubElement(documento, cliente.tipo_documento).text = cliente.numero_documento # Apenas Cnpj ??
etree.SubElement(identificacao, 'InscricaoMunicipal').text = cliente.inscricao_municipal # obrigatório??
etree.SubElement(raiz, 'RazaoSocial').text = cliente.razao_social
endereco = etree.SubElement(raiz, 'Endereco')
etree.SubElement(endereco, 'Endereco').text = cliente.endereco_logradouro
etree.SubElement(endereco, 'Numero').text = cliente.endereco_numero
if cliente.endereco_complemento:
etree.SubElement(endereco, 'Complemento').text = cliente.endereco_complemento
etree.SubElement(endereco, 'Bairro').text = cliente.endereco_bairro
etree.SubElement(endereco, 'CodigoMunicipio').text = obter_codigo_por_municipio(
cliente.endereco_municipio, cliente.endereco_uf)
etree.SubElement(endereco, 'Uf').text = cliente.endereco_uf
etree.SubElement(endereco, 'CodigoPais').text = cliente.endereco_pais
etree.SubElement(endereco, 'Cep').text = so_numeros(cliente.endereco_cep)
contato = etree.SubElement(raiz, 'Contato')
etree.SubElement(contato, 'Telefone').text = cliente.endereco_telefone
etree.SubElement(contato, 'Email').text = cliente.email
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_servico(self, servico, tag_raiz='Servico', retorna_string=False):
raiz = etree.Element(tag_raiz)
valores = etree.SubElement(raiz, 'Valores')
etree.SubElement(valores, 'ValorServicos').text = str('{:.2f}').format(servico.valor_servico)
etree.SubElement(raiz, 'IssRetido').text = str(servico.iss_retido)
#etree.SubElement(raiz, 'ResponsavelRetencao').text = ''
etree.SubElement(raiz, 'ItemListaServico').text = servico.item_lista
#etree.SubElement(raiz, 'CodigoCnae').text = ''
#etree.SubElement(raiz, 'CodigoTributacaoMunicipio').text = ''
etree.SubElement(raiz, 'Discriminacao').text = servico.discriminacao
etree.SubElement(raiz, 'CodigoMunicipio').text = servico.codigo_municipio
#etree.SubElement(raiz, 'CodigoPais').text = ''
"""
1 Exigível;
2 Não incidência;
3 Isenção;
4 Exportação;
5 Imunidade;
6 Exigibilidade Suspensa por Decisão Judicial;
7 Exigibilidade Suspensa por ProcessoAdministrativo
"""
etree.SubElement(raiz, 'ExigibilidadeISS').text = str(servico.exigibilidade)
etree.SubElement(raiz, 'MunicipioIncidencia').text = servico.codigo_municipio
#etree.SubElement(raiz, 'NumeroProcesso').text = ''
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_gerar(self, nfse, tag_raiz='GerarNfseEnvio', retorna_string=False):
if nfse.autorizador.upper() == 'BETHA':
raiz = etree.Element(tag_raiz, xmlns=NAMESPACE_BETHA)
# TODO - implementar outros sistemas autorizadores
else:
raiz = etree.Element(tag_raiz)
rps = etree.SubElement(raiz, 'Rps')
info = etree.SubElement(rps, 'InfDeclaracaoPrestacaoServico', Id=nfse.identificador)
etree.SubElement(info, 'Competencia').text = nfse.data_emissao.strftime('%Y-%m-%d')
# Servico
info.append(self._serializar_servico(nfse.servico))
# Emitente/Prestador
info.append(self._serializar_emitente(nfse.emitente))
# Cliente/Tomador
info.append(self._serializar_cliente(nfse.cliente))
etree.SubElement(info, 'OptanteSimplesNacional').text = str(nfse.simples) # 1-Sim; 2-Não
etree.SubElement(info, 'IncentivoFiscal').text = str(nfse.incentivo) # 1-Sim; 2-Não
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
def _serializar_consulta(self, nfse, tag_raiz='ConsultarNfseRpsEnvio', retorna_string=False):
if nfse.autorizador.upper() == 'BETHA':
namespace = NAMESPACE_BETHA
#versao = '2.02'
raiz = etree.Element(tag_raiz, xmlns=namespace)
identificacao = etree.SubElement(raiz, 'IdentificacaoRps')
etree.SubElement(identificacao, 'Numero').text = str(nfse.identificador)
etree.SubElement(identificacao, 'Serie').text = nfse.serie
etree.SubElement(identificacao, 'Tipo').text = nfse.tipo
raiz.append(self._serializar_emitente(nfse.emitente))
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz

106
pynfe/processamento/comunicacao.py

@ -369,53 +369,56 @@ class ComunicacaoNfse(Comunicacao):
self._versao = '2.02'
# url do serviço
url = self._get_url(autorizador)
# dados
raiz = etree.Element('nfseDadosMsg')
raiz.append(nota)
# xml
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
# comunica via wsdl
return self._post2(url, xml, 'gerar')
return self._post2(url, raiz)
def consulta_rps(self, autorizador, xml):
if autorizador.upper() == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
# url do serviço
url = self._get_url(autorizador)
# comunica via wsdl
return self._post2(url, xml, 'consultaRps')
def consulta_nota(self, autorizador, nota):
def consulta_faixa(self, autorizador, xml):
if autorizador.upper() == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
# url do serviço
url = self._get_url(autorizador) + NFSE[autorizador.upper()]['CONSULTA_RPS']
# consulta
raiz = etree.Element('ConsultarNfsePorRps')
# cabecalho
raiz.append(self._cabecalho_soap())
dados = etree.SubElement(raiz, 'nfseDadosMsg')
dados.append(nota)
# xml soap
xml = self._construir_xml(raiz)
url = self._get_url(autorizador)
# comunica via wsdl
return self._post2(url, xml, 'consultaFaixa')
retorno = self._post(url, xml)
return retorno
def cancelar(self, autorizador, nota):
if autorizador.upper() == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
# url do serviço
url = self._get_url(autorizador)
# xml
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
# comunica via wsdl
return self._post2(url, xml, 'cancelar')
def cancelar(self, autorizador):
pass
def _cabecalho(self, retorna_string=True):
u"""Monta o XML do cabeçalho da requisição wsdl"""
def _cabecalho(self, retorna_string=False):
u"""Monta o XML do cabeçalho da requisição SOAP"""
xml_declaration='<?xml version="1.0" encoding="UTF-8"?>'
raiz = etree.Element('nfseCabecMsg')
cabecalho = etree.SubElement(raiz, 'cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(cabecalho, 'versaoDados').text = self._versao
# cabecalho
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = '2.02'
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=False)
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _construir_xml(self, dados):
"""Mota o XML para o envio via SOAP"""
raiz = etree.Element('{%s}Envelope'%NAMESPACE_SOAP, nsmap={'e': self._namespace})
etree.SubElement(raiz, '{%s}Header'%NAMESPACE_SOAP)
body = etree.SubElement(raiz, '{%s}Body'%NAMESPACE_SOAP)
body.append(dados)
return raiz
def _get_url(self, autorizador):
""" Retorna a url para comunicação com o webservice """
if self._ambiente == 1:
@ -428,13 +431,6 @@ class ComunicacaoNfse(Comunicacao):
raise Exception('Autorizador nao encontrado!')
return self.url
def _post_header(self):
u"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
return {
u'content-type': u'application/soap+xml; charset=utf-8;',
u'Accept': u'application/soap+xml; charset=utf-8;',
}
def _post(self, url, xml):
certificadoA1 = CertificadoA1(self.certificado)
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
@ -461,16 +457,24 @@ class ComunicacaoNfse(Comunicacao):
finally:
certificadoA1.excluir()
def _post2(self, url, xml):
# declaraçao xml
xml_declaration='<?xml version="1.0" encoding="utf-8"?>'
def _post2(self, url, xml, metodo):
# cabecalho
cabecalho = self._cabecalho(retorna_string=True)
cabecalho = self._cabecalho()
# comunicacao wsdl
from suds.client import Client
cliente = Client(url)
import ipdb
ipdb.set_trace()
try:
from suds.client import Client
cliente = Client(url)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
return cliente.service.CancelarNfse(cabecalho, xml)
# TODO outros metodos
else:
pass
except Exception as e:
raise e

9
pynfe/processamento/serializacao.py

@ -5,7 +5,7 @@ from pynfe.entidades import NotaFiscal
from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, \
obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal, \
remover_acentos, obter_uf_por_codigo, obter_codigo_por_municipio
from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO, NAMESPACE_NFE, NAMESPACE_BETHA
from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO, NAMESPACE_NFE
class Serializacao(object):
@ -594,6 +594,13 @@ class SerializacaoNfse(object):
"Recebe uma string com o nome do autorizador."
self.autorizador = autorizador
def gerar(self, nfse):
if self.autorizador.lower() == 'betha':
from pynfe.processamento.autorizador_nfse import SerializacaoBetha
return SerializacaoBetha().gerar(nfse)
else:
raise Exception('Este método só esta implementado no autorizador Betha.')
class SerializacaoPipes(Serializacao):
"""Serialização utilizada pela SEFAZ-SP para a importação de notas."""

Loading…
Cancel
Save