|
|
|
@ -37,24 +37,60 @@ class Comunicacao(object): |
|
|
|
class ComunicacaoSefaz(Comunicacao): |
|
|
|
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados.""" |
|
|
|
|
|
|
|
_versao = VERSAO_PADRAO |
|
|
|
_versao = False |
|
|
|
_assinatura = AssinaturaA1 |
|
|
|
_namespace = NAMESPACE_NFE |
|
|
|
_namespace = False |
|
|
|
_header = False |
|
|
|
_envio_mensagem = 'nfeDadosMsg' |
|
|
|
_namespace_metodo = NAMESPACE_METODO |
|
|
|
_envio_mensagem = False |
|
|
|
_namespace_metodo = False |
|
|
|
_accept = False |
|
|
|
_soap_action = False |
|
|
|
_namespace_soap = NAMESPACE_SOAP |
|
|
|
_namespace_xsi = NAMESPACE_XSI |
|
|
|
_namespace_xsd = NAMESPACE_XSD |
|
|
|
_soap_version = 'soap' |
|
|
|
|
|
|
|
def _cabecalho_soap(self, metodo): |
|
|
|
"""Monta o XML do cabeçalho da requisição SOAP""" |
|
|
|
|
|
|
|
raiz = etree.Element( |
|
|
|
self._header, |
|
|
|
xmlns=self._namespace_metodo + metodo |
|
|
|
) |
|
|
|
etree.SubElement(raiz, 'versaoDados').text = '3.00' |
|
|
|
# MDFE_WS_METODO[metodo]['versao'] |
|
|
|
|
|
|
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
|
|
|
return raiz |
|
|
|
|
|
|
|
def _get_url_metodo(self, ws_metodo): |
|
|
|
url = ( |
|
|
|
'https://' + |
|
|
|
self._ws_url[self._ambiente]['servidor'] + |
|
|
|
'/' + |
|
|
|
self._ws_url[self._ambiente][ws_metodo] |
|
|
|
) |
|
|
|
metodo = self._ws_metodo[ws_metodo]['metodo'] |
|
|
|
return url, metodo |
|
|
|
|
|
|
|
def _construir_xml_soap(self, metodo, dados): |
|
|
|
"""Mota o XML para o envio via SOAP""" |
|
|
|
raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={ |
|
|
|
'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP}) |
|
|
|
|
|
|
|
raiz = etree.Element( |
|
|
|
'{%s}Envelope' % self._namespace_soap, |
|
|
|
nsmap={ |
|
|
|
'xsi': self._namespace_xsi, |
|
|
|
'xsd': self._namespace_xsd, |
|
|
|
self._soap_version: self._namespace_soap, |
|
|
|
}) |
|
|
|
|
|
|
|
if self._header: |
|
|
|
cabecalho = self._cabecalho_soap(metodo) |
|
|
|
c = etree.SubElement(raiz, '{%s}Header' % NAMESPACE_SOAP) |
|
|
|
c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap) |
|
|
|
c.append(cabecalho) |
|
|
|
|
|
|
|
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP) |
|
|
|
body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap) |
|
|
|
|
|
|
|
a = etree.SubElement( |
|
|
|
body, |
|
|
|
self._envio_mensagem, |
|
|
|
@ -75,22 +111,23 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
|
output.close() |
|
|
|
return etree.fromstring(contents) |
|
|
|
|
|
|
|
def _post_header(self): |
|
|
|
def _post_header(self, soap_webservice_method=False): |
|
|
|
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" |
|
|
|
# PE é a únca UF que exige SOAPAction no header |
|
|
|
if self.uf.upper() == 'PE': |
|
|
|
return { |
|
|
|
'content-type': 'application/soap+xml; charset=utf-8;', |
|
|
|
'Accept': 'application/soap+xml; charset=utf-8;', |
|
|
|
'SOAPAction': '' |
|
|
|
} |
|
|
|
else: |
|
|
|
return { |
|
|
|
'content-type': 'application/soap+xml; charset=utf-8;', |
|
|
|
'Accept': 'application/soap+xml; charset=utf-8;' |
|
|
|
header = { |
|
|
|
b'content-type': b'application/soap+xml; charset=utf-8;' |
|
|
|
} |
|
|
|
|
|
|
|
def _post(self, url, xml): |
|
|
|
# PE é a únca UF que exige SOAPAction no header |
|
|
|
if soap_webservice_method: |
|
|
|
header['SOAPAction'] = \ |
|
|
|
self._namespace_metodo + soap_webservice_method |
|
|
|
|
|
|
|
if self._accept: |
|
|
|
header['Accept'] = b'application/soap+xml; charset=utf-8;' |
|
|
|
|
|
|
|
return header |
|
|
|
|
|
|
|
def _post(self, url, xml, soap_webservice_method=False): |
|
|
|
certificado_a1 = CertificadoA1(self.certificado) |
|
|
|
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True) |
|
|
|
chave_cert = (cert, chave) |
|
|
|
@ -106,7 +143,13 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
|
) |
|
|
|
xml = xml_declaration + xml |
|
|
|
# Faz o request com o servidor |
|
|
|
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False) |
|
|
|
result = requests.post( |
|
|
|
url.encode('utf-8'), |
|
|
|
xml.encode('utf-8'), |
|
|
|
headers=self._post_header(soap_webservice_method), |
|
|
|
cert=chave_cert, |
|
|
|
verify=False |
|
|
|
) |
|
|
|
result.encoding = 'utf-8' |
|
|
|
return result |
|
|
|
except requests.exceptions.RequestException as e: |
|
|
|
|