|
|
@ -148,10 +148,12 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
def consulta_cadastro(self, modelo, cnpj): |
|
|
def consulta_cadastro(self, modelo, cnpj): |
|
|
|
|
|
# UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC |
|
|
|
|
|
lista_svrs = ['AC','RN','PB','SC'] |
|
|
# RS implementa um método diferente na consulta de cadastro |
|
|
# RS implementa um método diferente na consulta de cadastro |
|
|
if self.uf.upper() == 'RS': |
|
|
if self.uf.upper() == 'RS': |
|
|
url = NFE['RS']['CADASTRO'] |
|
|
url = NFE['RS']['CADASTRO'] |
|
|
elif self.uf.upper() == 'SVRS': |
|
|
|
|
|
|
|
|
elif self.uf.upper() in lista_svrs: |
|
|
url = NFE['SVRS']['CADASTRO'] |
|
|
url = NFE['SVRS']['CADASTRO'] |
|
|
elif self.uf.upper() == 'SVC-RS': |
|
|
elif self.uf.upper() == 'SVC-RS': |
|
|
url = NFE['SVC-RS']['CADASTRO'] |
|
|
url = NFE['SVC-RS']['CADASTRO'] |
|
|
@ -295,13 +297,17 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 |
|
|
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 |
|
|
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta] |
|
|
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta] |
|
|
elif modelo == 'nfce': |
|
|
elif modelo == 'nfce': |
|
|
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 |
|
|
|
|
|
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta] |
|
|
|
|
|
|
|
|
# PE é o unico UF que possiu NFE proprio e SVRS para NFCe |
|
|
|
|
|
if self.uf.upper() == 'PE': |
|
|
|
|
|
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] |
|
|
|
|
|
else: |
|
|
|
|
|
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 |
|
|
|
|
|
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta] |
|
|
else: |
|
|
else: |
|
|
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') |
|
|
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') |
|
|
# Estados que utilizam outros ambientes |
|
|
# Estados que utilizam outros ambientes |
|
|
else: |
|
|
else: |
|
|
lista_svrs = ['SE'] |
|
|
|
|
|
|
|
|
lista_svrs = ['AC','RN','PB','SC','SE'] |
|
|
if self.uf.upper() in lista_svrs: |
|
|
if self.uf.upper() in lista_svrs: |
|
|
if self._ambiente == 1: |
|
|
if self._ambiente == 1: |
|
|
ambiente = 'HTTPS' |
|
|
ambiente = 'HTTPS' |
|
|
@ -410,13 +416,23 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
chave_cert = (cert, chave) |
|
|
chave_cert = (cert, chave) |
|
|
# Abre a conexão HTTPS |
|
|
# Abre a conexão HTTPS |
|
|
try: |
|
|
try: |
|
|
xml_declaration='<?xml version="1.0" encoding="utf-8"?>' |
|
|
|
|
|
|
|
|
xml_declaration='<?xml version="1.0" encoding="UTF-8"?>' |
|
|
# limpa xml com caracteres bugados para infNFeSupl em NFC-e |
|
|
# limpa xml com caracteres bugados para infNFeSupl em NFC-e |
|
|
xml = re.sub('<qrCode>(.*?)</qrCode>', |
|
|
xml = re.sub('<qrCode>(.*?)</qrCode>', |
|
|
lambda x:x.group(0).replace('<','<').replace('>','>').replace('amp;',''), |
|
|
lambda x:x.group(0).replace('<','<').replace('>','>').replace('amp;',''), |
|
|
etree.tostring(xml, encoding='unicode').replace('\n','')) |
|
|
etree.tostring(xml, encoding='unicode').replace('\n','')) |
|
|
xml = xml_declaration + xml |
|
|
xml = xml_declaration + xml |
|
|
|
|
|
|
|
|
|
|
|
# adapter para substituir ssl por tls |
|
|
|
|
|
# s = requests.Session() |
|
|
|
|
|
# s.mount(url, AdapterTLSV1()) |
|
|
|
|
|
# print(xml) |
|
|
|
|
|
# # print(self._post_header()) |
|
|
|
|
|
# # Faz o request com o servidor |
|
|
|
|
|
# result = s.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False, timeout=120) |
|
|
|
|
|
# result.encoding = 'utf-8' |
|
|
|
|
|
# return result |
|
|
|
|
|
print(url) |
|
|
# Faz o request com o servidor |
|
|
# Faz o request com o servidor |
|
|
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False, timeout=120) |
|
|
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False, timeout=120) |
|
|
result.encoding = 'utf-8' |
|
|
result.encoding = 'utf-8' |
|
|
@ -649,3 +665,15 @@ class ComunicacaoNfse(Comunicacao): |
|
|
raise Exception('Método não implementado no autorizador.') |
|
|
raise Exception('Método não implementado no autorizador.') |
|
|
except Exception as e: |
|
|
except Exception as e: |
|
|
raise e |
|
|
raise e |
|
|
|
|
|
|
|
|
|
|
|
""" Adapter para conexão tls """ |
|
|
|
|
|
from requests.adapters import HTTPAdapter |
|
|
|
|
|
from requests.packages.urllib3.poolmanager import PoolManager |
|
|
|
|
|
import ssl |
|
|
|
|
|
|
|
|
|
|
|
class AdapterTLSV1(HTTPAdapter): |
|
|
|
|
|
def init_poolmanager(self, connections, maxsize, block=False): |
|
|
|
|
|
self.poolmanager = PoolManager(num_pools=connections, |
|
|
|
|
|
maxsize=maxsize, |
|
|
|
|
|
block=block, |
|
|
|
|
|
ssl_version=ssl.PROTOCOL_TLSv1_2) |