Páginas filhas
  • DEAI1-3107 Implementação do /transactions usando AMQP no Datasul

Objetivo

A partir da implementação inicial de suporte ao AMQP no Datasul, verificar a viabilidade de usar o AMQP como canal de comunicação para mensagens padronizadas.

Considerações

Para este spike, além de um documento relatando o resultado do estudo, tinha-se como premissa o desenvolvimento de uma prova de conceito (PoC) que validasse o estudo realizado. Esta PoC foi implementada no Novo Framework do Datasul, versão 12.1.23 (ainda em desenvolvimento na época deste estudo).

O estudo tomou por base a implementação realizada com base no spike desenvolvido na tarefa DEAI1-2400, cujo resultado está descrito aqui.

O spike iniciou com uma revisão das diretrizes definidas no documento produzido na tarefa DEAI1-2476. Foi tomado em consideração o item Protocolo de Mensagem Padronizada via AMQP (Sync/Async).

A partir dessas diretrizes, os seguintes objetivos iniciais foram definidos:

  • Conceber um modelo de configuração aderente as diretrizes, com relação aos exchanges, filas e bindings que o agente deve declarar.
  • Implementar o modo RPC utilizando o máximo da infraestrutura existente.
  • Implementar o suporte à troca de empresas, levando em consideração a empresa informada na mensagem recebida.
  • Desenhar o fluxo completo de envio e recebimento de mensagem padronizada usando AMQP, de forma a reutilizar o máximo possível da infraestrutura atual, utilizada pelos canais SOAP e REST.

Modelo de configuração

O modelo de configuração concebido para atender as diretrizes do suporte ao protocolo de mensagem padronizada via AMQP tem as seguintes características:

  • Apenas um exchange definido, para envio de erros, se necessário. Não foram definidos outros exchanges, porque as diretrizes exigem o uso do exchange padrão.
  • Apenas um worker, responsável por receber a mensagem padronizada enviada pelo broker AMQP.
  • No worker foi declarado como consumer o programa totvseai/mb/receiveMessageAMQP.p, o qual deve ter o método receiveMessage.
  • Apenas uma fila foi declarada no worker, com o nome seguindo o padrão aplicativo_interno@DATASUL.
  • A fila do worker foi declarada como não exclusiva (exclusive: false), não durável (durable: false) e auto-removível (autoDelete: true).
  • Nenhum binding foi declarado na fila.

Foram necessários ajustes na camada Progress (carga da configuração) e na camada Java (declaração dos exchanges, filas e bindings) para permitir o uso da configuração conforme os critérios acima.

Em relação a declaração dos destinos, foi omitido o atributo exchange, já que deve ser usado o exchange padrão também para envio.

Um exemplo de arquivo de configuração seguindo o que foi discriminado aqui encontra-se anexo a esta página (ver seção correspondente).

Implementação do modo RPC

Para a implementação do modo RPC no envio de mensagens a partir da camada Progress, foi necessário estudar o protocolo STOMP para determinar os recursos disponíveis que permitissem o cumprimento da tarefa.

O protocolo STOMP, na versão 1.2, provê o cabeçalho reply-to, previsto para uso na frame SEND. No cabeçalho reply-to deve-se informar o nome de uma fila para a qual a mensagem de resposta será enviada. Quando a fila informada no cabeçalho inicia com /temp-queue/, o servidor STOMP deve gerar uma fila aleatória automática e inseri-la na mensagem que será entregue aos consumidores da fila original. Os consumidores, por sua vez, devem enviar as mensagens de resposta para esta fila aleatória, que será consumida pelo cliente STOMP que enviou a mensagem original, permitindo o recebimento síncrono da resposta.

As alterações necessárias para atender este objetivo foram:

  • Incluir o cabeçalho reply-to, com o valor /temp-queue/response-msg, no programa de teste de envio de mensagens totvseai/mb/testStompClient.p.
  • Criação do programa totvseai/mb/receiveMessageAMQP.p, o qual chama o programa eai/receiveMessageJSON.p para realizar o processamento da mensagem padronizada. A resposta gerada é postada na fila designada pelo cabeçalho reply-to.
  • Ajuste no totvseai/mb/stompClient.p para capturar a resposta enviada pelo servidor e disponibilizar para o programa chamador.
  • Ainda no totvseai/mb/stompClient.p, o prefixo da fila, quando não há exchange informado, deve ser /amq/queue, para que o servidor AMQP considere filas criadas fora do contexto da conexão STOMP.

Apesar de não ter sido implementado na PoC, será necessário um método no programa totvseai/mb/dispatcher.p para indicar se o envio da mensagem deve aguardar resposta ou não, ou seja, se o modo RPC será ligado ou desligado.

Troca de empresas

Para realização da troca de empresa no Datasul de acordo com o conteúdo presente na mensagem recebida, foi utilizada a mesma técnica do endpoint SOAP que recebe as mensagens padronizadas em formato XML. Nesta técnica, a mensagem recebida no listener (camada Java) é enviada para o programa fwk/eai/deparacompany.p, na camada Progress, que faz a extração da empresa e a verifica no cadastro de de-para do EAI. O valor encontrado é retornado para a camada Java, que então chama o worker na camada Progress, passando o código da empresa obtido da mensagem.

Para viabilizar esta tarefa, foi necessário alterar o programa fwk/eai/deparacompany.p para extrair o código da empresa de mensagens em formato JSON.

Envio e recebimento usando AMQP

Um fluxo completo de envio e recebimento de mensagem padronizada usando AMQP pode ser implementado conforme segue:

  1. Configuração:
    1. Cadastramento de aplicativos externos com tipo de canal AMQP:
      1. Ao inserir o aplicativo, deve ser informado seu nome, o nome do servidor, o número da porta AMQP e o número da porta STOMP.
      2. Como não será possível fazer whois para determinar as transações do aplicativo, deve-se assumir as transações do aplicativo interno, considerando que:
        • Transações com modo suportado como Envio, serão cadastradas como Recebimento.
        • Transações com modo suportado como Recebimento, serão cadastradas como Envio.
        • Transações com modo suportado Ambos, serão cadastradas sem alteração.
      3. O arquivo de configuração do agente deve ser gerado/alterado com base nas informações providas no cadastro do aplicativo externo AMQP:
        • O nome do servidor será gravado no atributo endpoint.
        • A porta AMQP será gravada no atributo amqp.
        • A porta STOMP será gravada no atributo stomp.
        • O nome do aplicativo externo será incluído no atributo destinations. Os atributos destinationKey e routingKey serão iguais e seguirão o padrão aplicativo_externo@produto.
        • O nome do aplicativo interno será o nome da fila do worker.
    2. Definição de rotas
      1. Ao selecionar o aplicativo externo do tipo AMQP, todas as transações estarão disponíveis para estabelecer rotas.
      2. Serão gravadas as transações marcadas pelo usuário, mantendo o mesmo procedimento dos aplicativos SOAP e REST.
  2. Envio síncrono de mensagem:
    1. O programa de negócio chama o adapter da transação que deseja enviar.
    2. O adapter gera a mensagem e entrega para o Engine do EAI (dispatchMessage).
    3. O Engine, após as validações necessárias, determina qual rota para enviar a mensagem.
    4. Sendo um aplicativo AMQP, a mensagem será entregue para o canal AMQPChannel.
    5. O canal AMQPChannel converterá a mensagem para JSON e entregará para o Dispatcher, informando-o antes que o modo RPC deve ser ligado.
    6. Com base na destinationKey recebida, o Dispatcher realizará o envio da mensagem com o apoio do STOMPClient.
    7. O STOMPClient envia a mensagem para o broker AMQP e aguarda o retorno (ResponseMessage).
    8. Ao receber o retorno, o STOMPClient o armazena internamente e finaliza o envio.
    9. O Dispatcher recupera do STOMPClient o conteúdo retornado e repassa para o AMQPChannel.
    10. O AMQPChannel converte o conteúdo retornado para ResponseMessage e devolve para o Engine, que por sua vez, devolve para o Adapter.
    11. O Adapter processa o retorno e finaliza o fluxo síncrono.
  3. Envio assíncrono de mensagem
    1. O programa de negócio chama o Adapter da transação.
    2. O Adapter gera a mensagem e entrega para o Engine (dispatchMessage).
    3. Sendo uma mensagem assíncrona, esta é armazenada na fila e o Engine retorna para o Adapter, finalizando esta etapa.
    4. Em um momento posterior, o processador de fila obtem a mensagem para envio e entrega ao Engine.
    5. O Engine determina que a mensagem deve ser enviada para um aplicativo AMQP e entrega para o AMQPChannel.
    6. O AMQPChannel converte a mensagem para JSON e entrega para o Dispatcher com o modo RPC ligado.
    7. O Dispatcher, com o apoio do STOMPClient, realiza o envio da mensagem utilizando a destinationKey correspondente ao aplicativo externo da rota.
    8. O STOMPClient aguarda o retorno (ReceiptMessage).
    9. Ao receber o retorno, o STOMPClient o armazena internamente e finaliza a entrega.
    10. O Dispatcher recupera o retorno do STOMPClient e o repassa ao AMQPChannel.
    11. O AMQPChannel converte o retorno em uma ReceiptMessage e a devolve para o Engine.
    12. O Engine armazena a ReceiptMessage no repositório de mensagens.
    13. Quando a mensagem for processada pelo aplicativo destino, a ResponseMessage será entregue pelo fluxo de recebimento, que será descrito a seguir.
  4. Recebimento de mensagens (negócio e resposta)
    1. O agente AMQP, iniciado com o Datasul, recebe uma mensagem postada na fila do aplicativo interno.
    2. O agente chama o consumer associado à fila, para receber mensagens padronizadas via AMQP.
    3. O consumer delega a mensagem recebida para o programa de recebimento de mensagens JSON.
    4. A mensagem é processada da mesma forma que mensagens recebidas pelo canal REST.
    5. A resposta é devolvida para o consumer, que a posta na fila designada no atributo reply-to da mensagem recebida, se estiver presente. Do contrário, a resposta será colocada na fila correspondente ao aplicativo de origem da mensagem original.

Conclusões e Próximos Passos

Com os devidos ajustes, é perfeitamente possível utilizar a infraestrutura atual de AMQP para o tráfego de mensagens padronizadas. Entretanto, na versão 12.1.23 do Frame Novo do Datasul, não há tela de configuração do EAI disponível, sendo necessário considerar a retomada do configurador unificado para o Datasul.

Em relação a identificação da operação a ser realizada pela mensagem padronizada recebida, ficou patente a necessidade do retorno da tag Event ao Header, quando a mensagem estiver no formato JSON.

Em relação ao recibo, este também será necessário no tráfego da mensagem padronizada usando AMQP.

Sendo assim, será necessário a criação de tarefas no backlog da equipe conforme sugerido abaixo:

  • Retornar a tag event no header da mensagem padronizada, para permitir identificar se é upsert ou delete.
  • Implementar a ReceiptMessage para o formato JSON, com uso opcional no canal REST, mas obrigatório no canal AMQP.
  • Movimentação dos fontes do totvseai-mb do repositório do Frame Novo (FWK-git) para o repositório do EAI (EAI2-TFS).
  • (Spike) Estudo para componentizar o totvseai-mb, permitindo sua execução como um serviço separado do Datasul (Frame Novo). Entretanto, a execução embarcada também deve ser possível, ainda que não seja prioridade.
    • Este estudo requer uma conversa com o GCAD para verificar a experiência em relação a expedir componentes separados do produto.
  • Implementar a classe do canal AMQP, (TOTVSAppAMQPChannel), que será embarcada no componente totvseai-mb.pl.
  • Implementar a infraestrutura para gravação do aplicativo AMQP no banco de dados do EAI2.
  • Realizar a geração/atualização do arquivo de configuração do agente AMQP conforme especificado neste documento.
  • (Spike) Avaliar as alterações necessárias para utilizar o configurador unificado no Datasul, para configurar AMQP e REST.

Anexos

Arquivo de configuração

totvseai-mb-config.json

Fontes alterados no spike

spike.zip

  • Sem rótulos