Descrever o resultado do estudo realizado sobre a implementação de referência da área de segmento Datasul, que habilita a integração de dados utilizando broker AMQP (especificamente, RabbitMQ).
A implementação de referência para integração via AMQP surgiu de uma iniciativa da área de negócios do segmento de Manufatura do Datasul, como uma das etapas para viabilizar a transformação do ERP Datasul, em aplicativo monolítico, em uma solução componentizada, visando uma arquitetura baseada em microsserviços.
O projeto foi conduzido por Thiago Weber e Mauricio Obenaus, que utilizaram o cliente AMQP disponível no framework Java SpringFramework e foi inicialmente denominado de eai3, como referência a uma evolução da implementação de integração atual do Datasul, denominada de eai2.
Os fontes utilizados como base para este estudo estão na seção Anexos deste documento.
Para mais informações sobre o protocolo AMQP e os conceitos relacionados, consulte a documentação no site do RabbitMQ.
Seguindo a proposta realizada pela equipe de segmento, a implementação final da arquitetura de integração com AMQP utilizará duas camadas: uma em Java e a outra em Progress.
No lugar do termo eai3, a implementação utilizará o nome totvseai-mb, mantendo a referência ao produto de integração (totvseai) e sinalizando sua finalidade específica, de interagir com Message Brokers (mb) como o RabbitMQ, que suporta o protocolo AMQP.
Na parte Java, temos as classes abaixo, que realizam o trabalho de configuração da comunicação com o broker AMQP e o agente responsável pelo consumo de mensagens.
AppLauncher: responsável por iniciar a aplicação Spring no container Web (Tomcat/Wildfly) assim que ele é iniciado.
ComponentProvider: define os componentes que serão utilizados pela aplicação, como os componentes de configuração do broker AMQP.
IntegrationSetup: instancia os exchanges e queues com base nas configurações fornecidas e inicia os consumidores de mensagens (consumers).
MessageListener: responsável por receber a mensagem e encaminhá-la ao programa Progress responsável pelo processamento.
TotvsMBMessage: contém a mensagem AMQP e algumas informações adicionais.
ServiceManager: implementa uma forma de controle sobre a aplicação, permitindo parar e iniciar os listeners, sem que seja necessário interromper o container Web.
Na seção Anexos desta página há uma tabela relacionando a classe da implementação de referência do segmento com a classe a ser implementada na proposta final.
Na parte Progress, os programas abaixo realizam o trabalho de prover os dados de configuração utilizados pela parte Java, e o processamento das mensagens recebidas. Além disso, fornecem também um canal para envio de mensagens ao broker AMQP.
configProvider.p: fornece as configurações relativas ao agente, queues, exchanges, bindings, etc.
helper.p: contém funções utilitárias para manipulação da mensagem.
helper.i: include de definições para uso com o helper.p.
process.i: include com funções de pré-processamento da mensagem.
<worker>: desenvolvido pela área de negócio, é o programa que será acionado pela camada Java, ao receber uma mensagem do broker AMQP. Realizará o processamento da mensagem propriamente dito.
dispatcher.p: programa que será chamado pelas áreas de negócio para envio de mensagens a um broker AMQP.
stompClient.p: programa que implementa o cliente do protocolo STOMP, através do qual a mensagem será entregue ao broker AMQP.
Sobre a entrega de mensagens
A configuração da aplicação é baseada no seguinte modelo:
BrokerSettings: contém dados para conexão com o broker AMQP. Além do endpoint, são informados usuário, senha e as portas AMQP, STOMP e WebAPI do RabbitMQ.
Agent: contem dados que definem a aplicação de consumo das mensagens contidas no broker AMQP. No agente é informado o exchange que será usado para enviar mensagens de erro.
Workers: define os consumidores (consumers) das mensagens. Um agente pode ter 1 ou mais workers. No worker é informado o programa Progress que será chamado quando uma mensagem for recebida.
Queue: define a fila (queue) que será consumida. Uma queue sempre estará associada a um worker. E um worker pode ter 1 ou mais queues para consumir. Uma queue também pode ter 0, 1 ou mais argumentos de configuração (QueueArguments).
Bindings: define uma associação entre queue e exchange (denominada binding), na qual consta a chave de roteamento (routing key). Uma queue pode ter 1 ou mais bindings. Um binding pode ter 0, 1 ou mais argumentos de configuração (BindingArguments).
Exchanges: define os exchanges usados pelo agente e referenciados nos bindings e nos destinations.
Destinations: define as "rotas" para envio de mensagens. Cada item da lista tem uma chave (destinationKey) que deve ser referenciada pela regra de negócio quando a mensagem for enviada. A partir da chave, será possível determinar o exchange e a routingKey que serão informados quando a mensagem for enviada ao broker AMQP.
{ "settings": { "id": 1, "endpoint": "localhost", "username": "totvseai", "password": "totvs@123", "webapi": 15672, "amqp": 5672, "stomp": 61613 }, "exchanges": [{ "id": 1, "name": "BusinessExchange", "type": "TOPIC", "durable": false, "autoDelete": true, "alternateExchange": "" }, { "id": 2, "name": "AlternateExchange", "type": "TOPIC", "durable": false, "autoDelete": true, "alternateExchange": "" }, { "id": 3, "name": "ErrorExchange", "type": "TOPIC", "durable": false, "autoDelete": true, "alternateExchange": "" }, { "id": 4, "name": "DeadExchange", "type": "TOPIC", "durable": false, "autoDelete": true, "alternateExchange": "" } ], "agent": { "id": 1, "code": "TestAgent", "description": "Teste", "situation": "INACTIVE", "errorExchange": "ErrorExchange", "errorRoutingKey": "error", "workers": [{ "id": 1, "workerCode": "Progress", "description": "Worker em Progress", "consumer": "processMessage|eai3\/eai3worker.p", "queue": [{ "id": 1, "name": "BusinessQueue", "durable": false, "autoDelete": true, "exclusive": true, "bindings": [{ "id": 1, "routingKey": "business.create", "exchange": "BusinessExchange" }, { "id": 2, "routingKey": "business.delete", "exchange": "BusinessExchange" }, { "id": 3, "routingKey": "business.update.#", "exchange": "BusinessExchange" } ] } ] }, { "id": 2, "workerCode": "ProgressAlternate", "description": "Alternate Worker em Progress", "consumer": "alternateMessage|eai3\/eai3worker.p", "queue": [{ "id": 1, "name": "AlternateQueue", "durable": false, "autoDelete": true, "exclusive": true, "arguments": [{ "key": "x-dead-letter-exchange", "value": "DeadExchange" } ], "bindings": [{ "id": 1, "routingKey": "#", "exchange": "AlternateExchange" } ] } ] }, { "id": 3, "workerCode": "ErrorProcess", "description": "Error Worker em Progress", "consumer": "errorMessage|eai3\/eai3worker.p", "queue": [{ "id": 1, "name": "ErrorQueue", "durable": false, "autoDelete": true, "exclusive": true, "bindings": [{ "id": 1, "routingKey": "#", "exchange": "ErrorExchange" } ] } ] }, { "id": 4, "workerCode": "DeadProcess", "description": "Dead Worker em Progress", "consumer": "deadMessage|eai3\/eai3worker.p", "queue": [{ "id": 1, "name": "DeadQueue", "durable": false, "autoDelete": true, "exclusive": true, "bindings": [{ "id": 1, "routingKey": "#", "exchange": "DeadExchange" } ] } ] } ] }, "destinations": [{ "id" : 1, "destinationKey": "cliente-financeiro", "exchange": "BusinessExchange", "routingKey": "financial.customer.create" },{ "id": 2, "destinationKey": "pedido-compra-rm", "exchange": "BusinessExchange", "routingKey": "rm.supply.order.create" }] }
AMQP no contexto /transactions
Os pré-requisitos para a utilização da aplicação são:
A parte Java da aplicação deve estar instalada na pasta adequada, conforme o container Java escolhido.
A parte Progress, correspondente aos programas e arquivo de configuração, deve estar instalada no AppServer.
Estando o ambiente corretamente configurado, deve-se iniciar os componentes nesta ordem:
Ao iniciar o container Java, a aplicação é carregada, os componentes base são inicializados e as configurações são requisitadas a camada Progress, que fará a leitura do arquivo de configuração e devolverá as informações para a camada Java.
Com base nas informações de configuração, a aplicação inicializa os workers e demais componentes relacionados (exchanges, queues e listeners), estabelecendo uma conexão com o broker AMQP. Nesta conexão, o cliente AMQP cria os canais (channels) que serão utilizados para o consumo das mensagens nas filas (queues).
Quando uma mensagem é postada na fila que está sendo monitorada pelo worker, este envia a mensagem para o programa Progress associado como consumer. Este programa processa a mensagem, transformando-a num objeto de negócio com o auxilio do helper gerado pelo EAI2, ou utilizando outra forma, para que os dados da mensagem sejam encaminhados para a regra de negócio correspondente.
O consumer pode responder de imediato com um ACK, ou então, aguardar o processamento da regra de negócio e retornar uma mensagem com o resultado do processamento. Pode ainda, retornar mensagens de erro ocorridas no processamento. Tanto as mensagens de resposta quanto de erro podem ser devolvidas a camada JAVA, que então as encaminhará ao broker AMQP.
Quando a regra de negócio desejar enviar uma mensagem para o broker AMQP a partir da camada Progress, deve-se utilizar do dispatcher, informando o objeto de negócio que deseja enviar, bem como a chave de roteamento (destinationKey). O dispatcher fará a extração dos dados da mensagem de negócio, montará o corpo da mensagem e a encaminhará para o cliente STOMP, junto com o exchange e a routingKey, que serão obtidos das configurações, a partir da destinationKey recebida. O cliente STOMP, por sua vez, será o responsável por conectar ao broker AMQP, enviar a mensagem e fazer todos os tratamentos relacionados a comunicação, inclusive tratamento de exceções.
Na seção Anexos há um diagrama que mostra todos os componentes citados e como se relacionam.
As tarefas de desenvolvimento que derivarem deste spike devem considerar o seguinte:
Com base nas considerações acima, as tarefas sugeridas para efetivar o desenvolvimento são:
Sobre os testes, tanto o teste integrado quanto o de integração podem ser realizados usando Docker para disponibilizar uma instância do RabbitMQ. Para mais informações, consultar https://store.docker.com/images/rabbitmq.
Original | Modificado |
---|---|
Componente | Tecnologia | Novo Nome |
---|---|---|
EAI3Agent | Java | AppLauncher |
EAI3Components | Java | ComponentProvider |
EAI3ProgressSetup | Java | IntegrationSetup |
EAI3ProgressMessage | Java | TotvsMBListener |
SendMessage | Java | TotvsMBMessage |
EAI3ControlMessage | Java | ServiceManager |
eai3service.p | Progress | configProvider.p |
eai3worker.p | Progress | <depende da área de negócio> |
eai3helper.p | Progress | helper.p |
eai3helper.i | Progress | helper.i |
eai3process.i | Progress | process.i |
eai3stomp.p | Progress | stompClient.p |
No diagrama estão descritas as relações de dependência entre os componentes.