Esta página tem como objetivo ajudar os times da suíte logística a migrarem o framework para publicação e consumo de mensagens do RabbitMQ das suas aplicações, de Spring Cloud Stream para Spring AMQP. Esta migração é necessária pois a anotação StreamListener do Spring Cloud Stream foi depreciada/descontinuada e a alternativa oferecida exigiria uma mudança grande e de difícil transição.
...
Informações |
---|
|
- Não haverá perda de mensagens durante o processo de migração;
- Não haverá mensagens duplicadas por conta do processo de migração;
- Exceto para eventos recebidos do RAC que no momento de troca de pods pode duplicar mensagem, contudo, atualmente a duplicação de mensagem neste ponto já é conhecida e tratada.
- A migração poderá ser feita aos poucos, por mensagem específica, por um conjunto de mensagens ou de todas as mensagens desde que respeitado os procedimentos detalhados a seguir;
- Exceto os casos de manter a utilização das filas que já apontam para exchanges do tipo header e possuem 'n' suscribers
- Não quebrará a integração com outros produtos da suíte logística, nem com a infra estrutura de webhooks utilizado para integração com específicos e produtos externos;
- Ao fim da migração, o time estará com a mensageria pronta para futura migração do TJF 4.
|
Atualmente, boa parte dos serviços que publicam mensagens o fazem através de uma exchange do tipo TOPIC. Para receber mensagens de uma exchange, boa parte dos serviços o fazem através de um único bind com uma fila que representa o serviço (exemplo agendamento-exchange.yms-core). Esta abordagem é bastante simples, contudo é muito sensível a altos volumes de mensagens de tipos específico. Exemplo, o serviço recebe mensagens de vários tipos, ClienteCriadoEvent, FornecedorCriadoEvent, MotoristaCriadoEvent entre outros. Em um determinado momento, passa a receber um alto volume da mensagem ClienteCriadoEvent, como há somente uma única fila, as demais mensagens se misturam na fila esperando para serem consumidas. Conclusão, a alta carga de mensagens de um único tipo acaba impactando o desempenho de todo o serviço.
Para endereçar esta situação, vamos criar novos exchanges do tipo HEADER, estes exchanges serão utilizados no lugar das exchanges do tipo TOPIC. Exchanges do tipo HEADER ofereçam uma maior flexibilidade na forma de realizar bind para filas em relação as exchanges do tipo TOPIC. Uma vez que tenhamos as exchanges do tipo HEADER, ao invés de criarmos uma única fila para todo o serviço, criaremos novas filas com base em cada agregado do serviço ou alguma outra separação que o serviço julgar interessante para evitar que eventos específicos impactem o funcionamento geral do serviço. Desta forma, o filtro para entregar mensagens de uma exchange para uma fila passa a ficar no RabbitMQ.
Quando não há necessidade de reorganização das exchanges e filas Âncora |
---|
| SemReorganizacao |
---|
| SemReorganizacao |
---|
|
Para os serviços que já possuem exchanges do tipo HEADER e filas organizadas conforme algum critério previamente estruturado e que esteja atendendo as necessidades atuais, boa parte desta documentação é irrelevante bastando seguir os passos abaixo:
- Redeclarar as exchanges e filas conforme novo modelo
- Importante! Há dois formatos de gerenciamento de filas de erros, a nativa do RabbitMQ e a do TJF, utilizar a do TJF. A nativa requer a recriação da fila.
- Ajustar os publicadores para fazer uso do RabbitTemplate.
- Ajustar os consumidores substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler
- Os passos mencionados acima estão nesta documentação. Atentar-se também para as limitações mencionadas abaixo
- Se a fila já faz bind com uma exchange do tipo HEADER, deseja-se manter a fila existente e há várias classes do tipo subscriber que apontam para a mesma fila
- Será necessário realizar o processo de migração de todos subscribers da mesma fila de uma única vez
- Isso porque todos os métodos anotados com @RabbitHandler de uma mesma fila precisam estar na mesma classe.
- Estamos analisando uma abordagem para evitar que isso seja necessário, permitindo manter a organização atual de classes por agregado e não por fila.
Jira |
---|
server | JIRA |
---|
serverId | 0c783de1-186e-383b-975c-a1acd7d76cb5 |
---|
key | AHUBLOG-205 |
---|
|
- Quando há um mesmo evento com dois subscribers
- Para estes casos, será necessário
Criar duas filas diferentes com o mesmo filtro por type. Exemplo, há dois subscribers para um evento ProcessoFinalizadoEvent onde cada subscriber corresponde a um agregado diferente. Neste caso, deverá ser criada uma fila para cada agregado.
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | StreamListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@Component
public class PedidoSubscriber {
@StreamListener(target="fila.processo")
public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
// lógica
}
}
@Component
public class EstoqueSubscriber {
@StreamListener(target="fila.processo")
public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
// lógica
}
}
|
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | RabbitListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@RabbitListener(queues = "fila.pedido")
public class PedidoSubscriber {
@RabbitHandler
public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
// lógica
}
}
@RabbitListener(queues = "fila.estoque")
public class EstoqueSubscriber{
@RabbitHandler
public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
// lógica
}
}
@RabbitListener(queues = "fila.agregado")
public class AgregadoSubscriber {
@RabbitHandler
public void whenProcessoFinalizado(@Payload FinalizarProcessoAgregadoCommand event) {
// lógica
}
} |
Criar uma etapa após o ProcessoFinalizadoEvent que será responsável por chamar os diferentes comandos para cada agregado. Nesta abordagem também seria necessário disponibilizar os comandos via mensageria. Exemplo:
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | StreamListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@Component
public class PedidoSubscriber {
@StreamListener(target="fila.processo")
public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
// lógica
}
}
@Component
public class EstoqueSubscriber {
@StreamListener(target="fila.processo")
public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
// lógica
}
}
|
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | RabbitListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@RabbitListener(queues = "fila.processo")
public class ProcessoSubscriber {
@RabbitHandler
public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
// publicar FinalizarProcessoPedidoCommand
// publicar FinalizarProcessoEstoqueCommand
// publicar FinalizarProcessoAgregadoCommand
}
}
@RabbitListener(queues = "fila.processo")
public class PedidoSubscriber {
@RabbitHandler
public void whenProcessoFinalizado(@Payload FinalizarProcessoPedidoCommand event) {
// processar FinalizarProcessoPedidoCommand
}
}
@RabbitListener(queues = "fila.processo")
public class EstoqueSubscriber {
@RabbitHandler
public void whenProcessoFinalizado(@Payload FinalizarProcessoEstoqueCommand event) {
// processar FinalizarProcessoEstoqueCommand
}
}
@RabbitListener(queues = "fila.processo")
public class AgregadoSubscriber{
@RabbitHandler
public void whenProcessoFinalizado(@Payload FinalizarProcessoAgregadoCommand event) {
// processar FinalizarProcessoAgregadoCommand
}
} |
Outra possibilidade é chamar diretamente os comandos de aplicação em um único subscriber, contudo esta opção altera o formato de execução atual, fazendo com que o segundo comando de aplicação só seja executado após o sucesso do primeiro. Sendo assim, tenha consciência de que não será gerado um impacto indesejado ao produto.
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | StreamListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@Component
public class PedidoSubscriber {
@StreamListener(target="fila.processo")
public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
// lógica
}
}
@Component
public class EstoqueSubscriber {
@StreamListener(target="fila.processo")
public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
// lógica
}
}
|
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | RabbitListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@RabbitListener(queues = "fila.processo")
public class ProcessoSubscriber {
@RabbitHandler
public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
// lógica 01 -> executar FinalizarProcessoPedidoCommand
// lógica 02 -> executar FinalizarProcessoEstoqueCommand
}
} |
- Quando há necessidade de realizar um filtro com base em informações do corpo da mensagem
Neste caso o filtro que era feito através do parâmetro condition da anotação StreamListener terá que ser movido para o corpo do método através de uma cláusula condicional (if). Exemplo:
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | StreamListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public class SubscriberStreamListener {
public static final String NAME = "ProcessoExecutadoEvent";
public static final String CONDITIONAL_EXPRESSION_PROCESSO =
" && (@MessageFilter.get(payload,'content/origem') == 'MKPL'" +
" || @MessageFilter.get(payload,'content/origem') == 'PLDT')";
public static final String CONDITIONAL_EXPRESSION = "headers['type']=='" + NAME + "'" + CONDITIONAL_EXPRESSION_PROCESSO;
@StreamListener(target = Channel.PROCESSO_INPUT, condition = CONDITIONAL_EXPRESSION)
public void processoExecutado(TOTVSMessage<ProcessoExecutadoEvent> message) {
// processamento do evento
}
}
|
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | RabbitListener |
---|
linenumbers | true |
---|
collapse | true |
---|
|
@RabbitListener(queues = "fila.processo")
public class SubscriberAMQP {
private static final String ORIGEM_MKPL = "MKPL";
private static final String ORIGEM_PLDT = "PLDT";
@RabbitHandler
public void whenProcessoFinalizado(@Payload ProcessoExecutadoEvent event) {
if (ORIGEM_MKPL.equals(event.getOrigem()) || ORIGEM_PLDT.equals(event.getOrigem())) {
// processamento do evento
}
}
}
|
Será criada uma nova exchange do tipo HEADER para cada exchange TOPIC, adicionando um bind de exchange para exchange, partindo da TOPIC para HEADER. Assim conseguimos continuar publicando na exchange original, sem quebrar as integrações existentes e, criar as novas filas já com bind para a exchange do tipo HEADER.
...
A seção abaixo ilustra o passo a passo.
Exchanges do tipo TOPIC e serviços com sua respectiva fila recebendo todos os tipos de eventos.
Baixe a animação em MP4
Adicionar dependência ao sdk de migração que verifica se deve ou não adicionar header de versão (sl-version). Além do header, os interceptors nos subscribers também são adicionadospara garantir que somente um dos métodos seja executado (com StreamListener ou com RabbitListener). Neste momento só existirão no código os métodos com StreamListener. Como o sl-version ainda não será enviado, o método com StreamListener continuará sendo executado em todos os casos sempre.
Baixe a animação em MP4
Clique aqui para verificar o passo a passo desta etapa.
Nova exchange do tipo HEADER, bind entre as exchanges do tipo TOPIC e HEADER e duplicação do(s) subscribers, substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler.
Aviso |
---|
title | Para os novos subscribers do RAC, TPD e outros sistemas externos que não controlamos a publicação de mensagens, adicionar anotação @IgnoreMessageInterceptor para que os interceptors criados para migração na sdk sejam ignorados. |
---|
|
Caso isso não seja observado, mensagens serão perdidas. |
Baixe a animação em MP4
Clique aqui para verificar o passo a passo desta etapa.
Adicionar a anotação @VersionedEvent nos eventos para que sejam publicados com o header sl-version=2.
Aviso |
---|
title | TODOS os consumidores (em qualquer serviço) dos eventos a serem anotados com @VersionedEvent devem ter ao menos duplicado seus subscribers (segunda etapa do processo de expansão) |
---|
|
Caso isso não seja observado, mensagens serão perdidas. |
Baixe a animação em MP4
Clique aqui para verificar o passo a passo desta etapa.
Remover os subscribers anotados com @StreamListener
Aviso |
---|
title | TODOS os publicadores dos subscribers anotados com @StreamListener a serem removidos devem estar publicando o cabeçalho sl-version = 2 (terceira etapa de expansão) |
---|
|
Caso isso não seja observado, mensagens serão perdidas. |
Baixe a animação em MP4
Clique aqui para verificar o passo a passo desta etapa.
Desabilitar os interceptors.
Aviso |
---|
title | TODOS subscribers anotados com @StreamListener já devem ter sido removidos (primeira etapa do processo de contração) |
---|
|
Caso isso não seja observado, mensagens serão perdidas. |
Baixe a animação em MP4
Clique aqui para verificar o passo a passo desta etapa.
Remover a anotação @VersionedEvent dos eventos.
Aviso |
---|
title | TODOS os consumidores (em todos serviços) dos eventos com anotação @VersionedEvent removido já devem ter removido os subscribers antigos (segunda etapa do processo de contração) |
---|
|
Caso isso não seja observado, mensagens serão perdidas. |
Baixe a animação em MP4
Clique aqui para verificar o passo a passo desta etapa.
- Esta versão SNAPSHOT será convertida em RELEASE, processo em andamento com TJF Luciano De Araujo
Bloco de código |
---|
language | text |
---|
theme | Midnight |
---|
title | tjf 3.24.0 |
---|
linenumbers | true |
---|
|
<parent>
<groupId>com.totvs.tjf</groupId>
<artifactId>tjf-boot-starter</artifactId>
<version>3.24.0-SNAPSHOT</version>
</parent> |
Bloco de código |
---|
language | text |
---|
theme | Midnight |
---|
title | tjf-messaging-amqp |
---|
linenumbers | true |
---|
|
<dependency>
<groupId>com.totvs.tjf</groupId>
<artifactId>tjf-messaging-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.totvs.tjf</groupId>
<artifactId>tjf-test-amqp</artifactId>
<scope>test</scope>
</dependency> |
O sdk disponibiliza uma função para verificar se deve ou não adicionar o header sl-version com valor 2. Também disponibiliza a anotação @VersionedEventpara indicar se o evento deve ser publicado com sl-version com valor 2.
...
Bloco de código |
---|
language | text |
---|
theme | Midnight |
---|
title | sdk de migração |
---|
linenumbers | true |
---|
|
<repository>
<id>SuiteLogistica</id>
<url>https://pkgs.dev.azure.com/totvstfs/SuiteLogistica/_packaging/SuiteLogistica/maven/v1</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<dependency>
<groupId>com.totvs.sl.migracao.amqp.sdk</groupId>
<artifactId>migracao.amqp.sdk</artifactId>
<version>1.0.7-RELEASE</version>
</dependency>
|
Obs.: Registrar todos os exchanges existentes no serviço conforme novo modelo.
...
Bloco de código |
---|
language | yml |
---|
theme | Midnight |
---|
title | Yaml com exchanges existentes |
---|
linenumbers | true |
---|
|
tjf:
messaging:
amqp:
exchanges:
topicExchanges: #Existentes do tipo topic
- name: coleta-entrega-exchange
durable: true
autoDelete: false
- name: coleta-entrega-query-errors
durable: true
autoDelete: false
headersExchanges: #Existentes do tipo headers
- name: coleta-entrega-header-exchange
durable: true
autoDelete: false
|
Novo formato de plublicação de mensagens.
...
Aviso |
---|
|
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA! !!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!! |
Voltar para animação
No caso das exchanges TOPIC, deverá ser criada uma exchange HEADER e será feito o bind entre elas, conforme mencionado anteriormente.
Bloco de código |
---|
language | yml |
---|
theme | Midnight |
---|
title | Yaml com novo exchange |
---|
linenumbers | true |
---|
|
tjf:
messaging:
amqp:
error: #NOVO, ADICIONAR
exchange: coleta-entrega-query-errors # Aponta no tjf a exchange de erros - dlq
migration: #NOVO, ADICIONAR
interceptors:
enabled: true # Habilita o uso dos interceptors durante o processo de migração
exchanges:
topicExchanges:
- name: coleta-entrega-exchange
durable: true
autoDelete: false
- name: coleta-entrega-query-errors
durable: true
autoDelete: false
headersExchanges:
- name: coleta-entrega-header-exchange
durable: true
autoDelete: false
bindings: #NOVO, ADICIONAR
bindingsList:
- destination: coleta-entrega-header-exchange #NOVA EXCHANGE DO TIPO HEADER
exchange: coleta-entrega-exchange #EXCHANGE EXISTENTE DO TIPO TOPIC
routingKey: "#"
destinationType: exchange |
Neste exemplo novas filas serão criadas:
Bloco de código |
---|
language | yml |
---|
theme | Midnight |
---|
title | Yaml com novas filas |
---|
linenumbers | true |
---|
|
tjf:
messaging:
amqp:
error:
exchange: coleta-entrega-query-errors
migration:
interceptors:
enabled: true
exchanges:
topicExchanges:
- name: coleta-entrega-exchange
durable: true
autoDelete: false
- name: coleta-entrega-query-errors
durable: true
autoDelete: false
headersExchanges:
- name: coleta-entrega-header-exchange
durable: true
autoDelete: false
queues: #NOVO, ADICIONAR
queuesList:
- name: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Cria a nova queue do domínio de arquivo - V2
durable: true
autoDelete: false
- name: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Cria a nova queue do domínio de unidade - V2
durable: true
autoDelete: false
- name: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Cria a nova queue do domínio de organizacao - V2
durable: true
autoDelete: false
- name: coleta-entrega-header-exchange.sl-coleta-entrega.video # Cria a nova queue do domínio de video - V2
durable: true
autoDelete: false
- name: coleta-entrega-query-errors.coleta-entrega-query-errors # Cria a nova queue de erros - dlq
durable: true
autoDelete: false
exclusive: false
queue-dlq: coleta-entrega-query-errors.coleta-entrega-query-errors # Define a queue de erros como dlq no tjf
bindings: #ATUALIZADO, ALTERAR
bindingsList:
- destination: coleta-entrega-header-exchange
exchange: coleta-entrega-exchange
routingKey: "#"
destinationType: exchange
- destination: coleta-entrega-query-errors.coleta-entrega-query-errors # Novo, faz o binding da fila de erros - dlq
exchange: coleta-entrega-query-errors
destinationType: queue
- destination: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Novo, faz o binding dos eventos de arquivo
exchange: coleta-entrega-header-exchange
destinationType: queue
arguments:
type: [ArquivoCriadoEvent, ArquivoRemovidoEvent] #UMA DAS POSSIVEIS FORMAS DE DECLARAR UMA LISTA DE EVENTOS
- destination: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Novo, faz o binding do evento de unidade
exchange: coleta-entrega-header-exchange
destinationType: queue
arguments:
type: UnidadeCriadaEvent
- destination: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Novo, faz o binding do evento de organizacao
exchange: coleta-entrega-header-exchange
destinationType: queue
arguments:
type: OrganizacaoCriadaEvent
- destination: coleta-entrega-header-exchange.sl-coleta-entrega.video # Novo, faz o binding dos eventos de video
exchange: coleta-entrega-header-exchange
destinationType: queue
arguments:
type: # OUTRA FORMA DE DECLARAR UMA LISTA DE EVENTOS
- VideoHowToCriadoEvent
- VideoHowToRemovidoEvent
|
Primeiramente deve-se fazer uma cópia da classe subscriber original e nomear a atual como old.
...
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | RabbitListener |
---|
linenumbers | true |
---|
|
@Component
@AllArgsConstructor
@RabbitListener(queues = "coleta-entrega-header-exchange.sl-coleta-entrega.arquivo")
public class ArquivoSubscriber {
private final ArquivoService service;
@RabbitHandler
public void arquivoCriado(@Payload ArquivoCriadoEvent message) {
service.on(message);
}
@RabbitHandler
public void arquivoRemovido(@Payload ArquivoRemovidoEvent message) {
service.on(message);
}
}
// TESTES
@DisplayName("Arquivo Header - Mensageria")
public class ArquivoSubscriberIT extends AdapterConfigIT {
@MockBean
private ArquivoService service;
@Captor
private ArgumentCaptor<ArquivoCriadoEvent> createCaptor;
@Captor
private ArgumentCaptor<ArquivoRemovidoEvent> removeCaptor;
@Test
@DisplayName("Deve manipular mensagem recebido por mensageria")
void deveManipularArquivoCriadoEvent() throws Exception {
var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class);
var listener = new MockListener(subscriber);
var event = mockArquivoCriadoEvent();
var message = TOTVSMessageBuilder
.withType(ArquivoCriadoEvent.NAME)
.setContent(event)
.buildAmqp();
listener.sendMessage(message);
verify(subscriber).arquivoCriado(createCaptor.capture());
assertInstanceOf(ArquivoCriadoEvent.class, createCaptor.getValue());
}
@Test
@DisplayName("Deve manipular mensagem recebido por mensageria")
void deveManipularArquivoRemovidoEvent() throws Exception {
var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class);
var listener = new MockListener(subscriber);
var arquivo = ArquivoTestFactory.umArquivo();
var event = ArquivoRemovidoEvent.of(arquivo.getId());
var message = TOTVSMessageBuilder
.withType(ArquivoRemovidoEvent.NAME)
.setContent(event).buildAmqp();
listener.sendMessage(message);
verify(subscriber).arquivoRemovido(removeCaptor.capture());
assertInstanceOf(ArquivoRemovidoEvent.class, removeCaptor.getValue());
}
@Test
@DisplayName("Deve criar um arquivo recebido por mensageria")
void deveCriarUmArquivo() {
var subscriber = new ArquivoHeaderSubscriber(service);
var event = mockArquivoCriadoEvent();
subscriber.arquivoCriado(event);
var expected = ArquivoCriadoEvent.builder()
.id(event.getId())
.nome(event.getNome())
.tipoConteudo(event.getTipoConteudo())
.removido(event.getRemovido())
.dataCriacao(event.getDataCriacao())
.tamanho(event.getTamanho())
.prefixo(event.getPrefixo())
.build();
verify(service).on(expected);
}
@Test
@DisplayName("Deve remover um arquivo recebido por mensageria")
void deveRemoverUmArquivo() {
var arquivo = ArquivoTestFactory.umArquivo();
var subscriber = new ArquivoHeaderSubscriber(service);
var event = ArquivoRemovidoEvent.of(arquivo.getId());
subscriber.arquivoRemovido(event);
var expected = ArquivoRemovidoEvent.of(event.getId());
verify(service).on(expected);
}
private ArquivoCriadoEvent mockArquivoCriadoEvent() {
return ArquivoCriadoEvent.builder()
.id(UUID.randomUUID().toString())
.nome("nome")
.tipoConteudo("tipoConteudo")
.removido(false)
.dataCriacao(ZonedDateTime.now())
.tamanho(new BigDecimal(127))
.prefixo(TestUtils.tenantId + "/")
.build();
}
}
|
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | StreamListener |
---|
linenumbers | true |
---|
|
@RabbitListener(queues = "queue")
@IgnoreMessageInterceptor // Isto fará o skip do interceptors de todos os metodos da classe
public class AlgumRabbitListenerSubscriber {
} |
Aviso |
---|
|
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA! !!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!! |
Voltar para animação
No serviço que envia as mensagens:
...
Aviso |
---|
|
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA! !!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!! |
Voltar para animação
Depois que todos os pods publicadores foram substituídos e todas as mensagens da fila original foram drenadas, pode ser removida a classe subscriber antiga juntamente com suas configurações antigas existentes no yaml.
Voltar para animação
Agora é só repetir os passos até fazer a troca de todos os subscribers.
Mudar o valor da configuração tjf.messaging.amqp.error.migration.interceptors.enable para false.
...
Aviso |
---|
|
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA! !!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!! |
Voltar para animação
Depois que todos os consumidores das mensagens já foram migrados para @RabbitListener, remover a anotação @VersionedEvent, o código adicionado nos Publishers, a configuração referente ao sdk de migração do yaml (migration.interceptors.enabled) e finalmente a dependência do SDK.
...
Bloco de código |
---|
language | java |
---|
theme | Midnight |
---|
title | Publicador após contração |
---|
linenumbers | true |
---|
|
protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) {
TOTVSMessageBuilder.withType(contentToBeDispatched.getClass().getSimpleName()
.setContent(contentToBeDispatched)
.setTransactionInfo(transactionInfo)
.buildAmqp()
.sendTo(rabbitTemplate, EXCHANGE, ROUTING_KEY);
} |
Voltar para animação