Páginas filhas
  • DEAI1-2400 - Arquitetura de serviços no Datasul com AMQP

Objetivo

Descrever o resultado do estudo realizado sobre a implementação de referência da área de segmento Datasul, que habilita a integração de dados utilizando broker AMQP (especificamente, RabbitMQ).

Considerações iniciais

A implementação de referência para integração via AMQP surgiu de uma iniciativa da área de negócios do segmento de Manufatura do Datasul, como uma das etapas para viabilizar a transformação do ERP Datasul, em aplicativo monolítico, em uma solução componentizada, visando uma arquitetura baseada em microsserviços.

O projeto foi conduzido por Thiago Weber e Mauricio Obenaus, que utilizaram o cliente AMQP disponível no framework Java SpringFramework e foi inicialmente denominado de eai3, como referência a uma evolução da implementação de integração atual do Datasul, denominada de eai2.

Os fontes utilizados como base para este estudo estão na seção Anexos deste documento.

Para mais informações sobre o protocolo AMQP e os conceitos relacionados, consulte a documentação no site do RabbitMQ.

Proposta de implementação

Seguindo a proposta realizada pela equipe de segmento, a implementação final da arquitetura de integração com AMQP utilizará duas camadas: uma em Java e a outra em Progress.

No lugar do termo eai3, a implementação utilizará o nome totvseai-mb, mantendo a referência ao produto de integração (totvseai) e sinalizando sua finalidade específica, de interagir com Message Brokers (mb) como o RabbitMQ, que suporta o protocolo AMQP.

Camada JAVA

Na parte Java, temos as classes abaixo, que realizam o trabalho de configuração da comunicação com o broker AMQP e o agente responsável pelo consumo de mensagens.

A camada Java é dependente da biblioteca Springboot-AMQP para realizar a comunicação com o broker.


AppLauncher: responsável por iniciar a aplicação Spring no container Web (Tomcat/Wildfly) assim que ele é iniciado.

ComponentProvider: define os componentes que serão utilizados pela aplicação, como os componentes de configuração do broker AMQP.

IntegrationSetup: instancia os exchanges e queues com base nas configurações fornecidas e inicia os consumidores de mensagens (consumers).

MessageListener: responsável por receber a mensagem e encaminhá-la ao programa Progress responsável pelo processamento.

TotvsMBMessage: contém a mensagem AMQP e algumas informações adicionais.

ServiceManager: implementa uma forma de controle sobre a aplicação, permitindo parar e iniciar os listeners, sem que seja necessário interromper o container Web.

Na seção Anexos desta página há uma tabela relacionando a classe da implementação de referência do segmento com a classe a ser implementada na proposta final.

Camada PROGRESS

Na parte Progress, os programas abaixo realizam o trabalho de prover os dados de configuração utilizados pela parte Java, e o processamento das mensagens recebidas. Além disso, fornecem também um canal para envio de mensagens ao broker AMQP.

configProvider.p: fornece as configurações relativas ao agente, queues, exchanges, bindings, etc.

helper.p: contém funções utilitárias para manipulação da mensagem.

helper.i: include de definições para uso com o helper.p.

process.i: include com funções de pré-processamento da mensagem.

<worker>: desenvolvido pela área de negócio, é o programa que será acionado pela camada Java, ao receber uma mensagem do broker AMQP. Realizará o processamento da mensagem propriamente dito.

dispatcher.p: programa que será chamado pelas áreas de negócio para envio de mensagens a um broker AMQP.

stompClient.p: programa que implementa o cliente do protocolo STOMP, através do qual a mensagem será entregue ao broker AMQP.

Sobre a entrega de mensagens

O uso do protocolo STOMP, no lugar do protocolo AMQP, para envio de mensagens ao broker se deve por limitações técnicas do Progress. Uma alternativa seria delegar também o envio para a camada Java, porém esta possibilidade será explorada futuramente, para não aumentar o escopo do spike.

Configuração

A configuração da aplicação é baseada no seguinte modelo:

BrokerSettings: contém dados para conexão com o broker AMQP. Além do endpoint, são informados usuário, senha e as portas AMQP, STOMP e WebAPI do RabbitMQ.

Agent: contem dados que definem a aplicação de consumo das mensagens contidas no broker AMQP. No agente é informado o exchange que será usado para enviar mensagens de erro.

Workers: define os consumidores (consumers) das mensagens. Um agente pode ter 1 ou mais workers. No worker é informado o programa Progress que será chamado quando uma mensagem for recebida.

Queue: define a fila (queue) que será consumida. Uma queue sempre estará associada a um worker. E um worker pode ter 1 ou mais queues para consumir. Uma queue também pode ter 0, 1 ou mais argumentos de configuração (QueueArguments).

Bindings: define uma associação entre queue e exchange (denominada binding), na qual consta a chave de roteamento (routing key). Uma queue pode ter 1 ou mais bindings. Um binding pode ter 0, 1 ou mais argumentos de configuração (BindingArguments).

Exchanges: define os exchanges usados pelo agente e referenciados nos bindings e nos destinations.

Destinations: define as "rotas" para envio de mensagens. Cada item da lista tem uma chave (destinationKey) que deve ser referenciada pela regra de negócio quando a mensagem for enviada. A partir da chave, será possível determinar o exchange e a routingKey que serão informados quando a mensagem for enviada ao broker AMQP.

Exemplo - Arquivo de configuração
{
  "settings": {
    "id": 1,
    "endpoint": "localhost",
    "username": "totvseai",
    "password": "totvs@123",
    "webapi": 15672,
    "amqp": 5672,
    "stomp": 61613
  },
  "exchanges": [{
      "id": 1,
      "name": "BusinessExchange",
      "type": "TOPIC",
      "durable": false,
      "autoDelete": true,
      "alternateExchange": ""
    }, {
      "id": 2,
      "name": "AlternateExchange",
      "type": "TOPIC",
      "durable": false,
      "autoDelete": true,
      "alternateExchange": ""
    }, {
      "id": 3,
      "name": "ErrorExchange",
      "type": "TOPIC",
      "durable": false,
      "autoDelete": true,
      "alternateExchange": ""
    }, {
      "id": 4,
      "name": "DeadExchange",
      "type": "TOPIC",
      "durable": false,
      "autoDelete": true,
      "alternateExchange": ""
    }
  ],
  "agent": {
    "id": 1,
    "code": "TestAgent",
    "description": "Teste",
    "situation": "INACTIVE",
    "errorExchange": "ErrorExchange",
    "errorRoutingKey": "error",
    "workers": [{
        "id": 1,
        "workerCode": "Progress",
        "description": "Worker em Progress",
        "consumer": "processMessage|eai3\/eai3worker.p",
        "queue": [{
            "id": 1,
            "name": "BusinessQueue",
            "durable": false,
            "autoDelete": true,
            "exclusive": true,
            "bindings": [{
                "id": 1,
                "routingKey": "business.create",
                "exchange": "BusinessExchange"
              }, {
                "id": 2,
                "routingKey": "business.delete",
                "exchange": "BusinessExchange"
              }, {
                "id": 3,
                "routingKey": "business.update.#",
                "exchange": "BusinessExchange"
              }
            ]
          }
        ]
      }, {
        "id": 2,
        "workerCode": "ProgressAlternate",
        "description": "Alternate Worker em Progress",
        "consumer": "alternateMessage|eai3\/eai3worker.p",
        "queue": [{
            "id": 1,
            "name": "AlternateQueue",
            "durable": false,
            "autoDelete": true,
            "exclusive": true,
            "arguments": [{
                "key": "x-dead-letter-exchange",
                "value": "DeadExchange"
              }
            ],
            "bindings": [{
                "id": 1,
                "routingKey": "#",
                "exchange": "AlternateExchange"
              }
            ]
          }
        ]
      }, {
        "id": 3,
        "workerCode": "ErrorProcess",
        "description": "Error Worker em Progress",
        "consumer": "errorMessage|eai3\/eai3worker.p",
        "queue": [{
            "id": 1,
            "name": "ErrorQueue",
            "durable": false,
            "autoDelete": true,
            "exclusive": true,
            "bindings": [{
                "id": 1,
                "routingKey": "#",
                "exchange": "ErrorExchange"
              }
            ]
          }
        ]
      }, {
        "id": 4,
        "workerCode": "DeadProcess",
        "description": "Dead Worker em Progress",
        "consumer": "deadMessage|eai3\/eai3worker.p",
        "queue": [{
            "id": 1,
            "name": "DeadQueue",
            "durable": false,
            "autoDelete": true,
            "exclusive": true,
            "bindings": [{
                "id": 1,
                "routingKey": "#",
                "exchange": "DeadExchange"
              }
            ]
          }
        ]
      }
    ]
  },
  "destinations": [{
    "id" : 1,
    "destinationKey": "cliente-financeiro",
    "exchange": "BusinessExchange",
    "routingKey": "financial.customer.create"
  },{
    "id": 2,
    "destinationKey": "pedido-compra-rm",
    "exchange": "BusinessExchange",
    "routingKey": "rm.supply.order.create"
  }]
}

AMQP no contexto /transactions

A integração entre ERPs no contexto server-to-server, utilizando o endpoint /transactions, também utilizará comunicação via AMQP. Entretanto, as configurações do primeiro contexto não serão compartilhadas com as configurações apresentadas neste documento, já que o contexto de integração baseada em serviços é opcional e pode ser utilizado em paralelo ao contexto utilizando mensagem padronizada e o endpoint /transactions.


Pré-requisitos

Os pré-requisitos para a utilização da aplicação são:

  • Java Runtime Environment 8 ou superior
  • Progress 11 ou superior
  • Servidor de Aplicação Java JBoss Wildfly 10 ou superior
  • Container Web Java Apache Tomcat 9 ou superior
  • Servidor de Aplicação Progress (AppServer) 11 ou superior
  • ERP Datasul utilizando novo framework, disponível a partir da versão 12.1.21.

A parte Java da aplicação deve estar instalada na pasta adequada, conforme o container Java escolhido.

A parte Progress, correspondente aos programas e arquivo de configuração, deve estar instalada no AppServer.

Estando o ambiente corretamente configurado, deve-se iniciar os componentes nesta ordem:

  1. Bancos de dados
  2. Servidor de aplicação Progress
  3. Servidor de aplicação/container Java

Funcionamento

Ao iniciar o container Java, a aplicação é carregada, os componentes base são inicializados e as configurações são requisitadas a camada Progress, que fará a leitura do arquivo de configuração e devolverá as informações para a camada Java.

Com base nas informações de configuração, a aplicação inicializa os workers e demais componentes relacionados (exchanges, queues e listeners), estabelecendo uma conexão com o broker AMQP. Nesta conexão, o cliente AMQP cria os canais (channels) que serão utilizados para o consumo das mensagens nas filas (queues).

Quando uma mensagem é postada na fila que está sendo monitorada pelo worker, este envia a mensagem para o programa Progress associado como consumer. Este programa processa a mensagem, transformando-a num objeto de negócio com o auxilio do helper gerado pelo EAI2, ou utilizando outra forma, para que os dados da mensagem sejam encaminhados para a regra de negócio correspondente.

O consumer pode responder de imediato com um ACK, ou então, aguardar o processamento da regra de negócio e retornar uma mensagem com o resultado do processamento. Pode ainda, retornar mensagens de erro ocorridas no processamento. Tanto as mensagens de resposta quanto de erro podem ser devolvidas a camada JAVA, que então as encaminhará ao broker AMQP.

Quando a regra de negócio desejar enviar uma mensagem para o broker AMQP a partir da camada Progress, deve-se utilizar do dispatcher, informando o objeto de negócio que deseja enviar, bem como a chave de roteamento (destinationKey). O dispatcher fará a extração dos dados da mensagem de negócio, montará o corpo da mensagem e a encaminhará para o cliente STOMP, junto com o exchange e a routingKey, que serão obtidos das configurações, a partir da destinationKey recebida. O cliente STOMP, por sua vez, será o responsável por conectar ao broker AMQP, enviar a mensagem e fazer todos os tratamentos relacionados a comunicação, inclusive tratamento de exceções.

Na seção Anexos há um diagrama que mostra todos os componentes citados e como se relacionam.

Considerações e Recomendações

As tarefas de desenvolvimento que derivarem deste spike devem considerar o seguinte:

  • Tratamento de erros: a implementação de referência não faz um tratamento adequado de falhas, mas tais tratamentos devem ser considerados no escopo de desenvolvimento. Alguns exemplos a tratar:
    • Broker AMQP fora do ar ao iniciar o container Java.
    • Appserver Progress fora do ar ao iniciar o container Java.
    • Broker AMQP fora do ar ao enviar uma mensagem a partir do Progress.
  • Arquivo de configuração: a implementação de referência não carrega o arquivo de configuração demonstrado neste documento. Incluir esta tarefa no esforço de desenvolvimento.
  • Novos fontes: alguns fontes citados neste documento não constam na implementação de referência e devem ser criados.
    • Dispatcher.p.

Com base nas considerações acima, as tarefas sugeridas para efetivar o desenvolvimento são:

  1. Criação do projeto de desenvolvimento base utilizando os fontes da implementação de referência modificados neste spike (EAI3Agente-mod.zip).
  2. Refatoração dos fontes considerando os novos nomes utilizados neste documento (ver lista abaixo).
  3. Implementação da leitura do arquivo de configuração.
  4. Criação do novo fonte - dispatcher.p.
  5. Testes de integração entre duas instâncias rodando a implementação.

Sobre os testes, tanto o teste integrado quanto o de integração podem ser realizados usando Docker para disponibilizar uma instância do RabbitMQ. Para mais informações, consultar https://store.docker.com/images/rabbitmq.

Anexos

Fontes da implementação de referência


Relação de classes e programas da implementação de referência

ComponenteTecnologiaNovo Nome
EAI3AgentJavaAppLauncher
EAI3ComponentsJavaComponentProvider
EAI3ProgressSetupJava

IntegrationSetup

EAI3ProgressMessageJavaTotvsMBListener
SendMessageJavaTotvsMBMessage
EAI3ControlMessageJavaServiceManager
eai3service.pProgressconfigProvider.p
eai3worker.pProgress<depende da área de negócio>
eai3helper.pProgresshelper.p
eai3helper.iProgresshelper.i
eai3process.iProgressprocess.i
eai3stomp.pProgressstompClient.p

Visão detalhada dos componentes da aplicação

No diagrama estão descritas as relações de dependência entre os componentes.



Indice

  • Sem rótulos