Árvore de páginas

Versões comparadas

Chave

  • Esta linha foi adicionada.
  • Esta linha foi removida.
  • A formatação mudou.

Índice

Introdução

Esta página tem como objetivo ajudar os times da suíte logística a migrarem o framework para publicação e consumo de mensagens do RabbitMQ das suas aplicações, de Spring Cloud Stream para Spring AMQP. Esta migração é necessária pois a anotação StreamListener do Spring Cloud Stream foi depreciada/descontinuada e a alternativa oferecida exigiria uma mudança grande e de difícil transição. 

...

Informações
titlePremissas
  1. Não haverá perda de mensagens durante o processo de migração;
  2. Não haverá mensagens duplicadas por conta do processo de migração;
    1. Exceto para eventos recebidos do RAC que no momento de troca de pods pode duplicar mensagem, contudo, atualmente a duplicação de mensagem neste ponto já é conhecida e tratada.
  3. A migração poderá ser feita aos poucos, por mensagem específica, por um conjunto de mensagens ou de todas as mensagens desde que respeitado os procedimentos detalhados a seguir; 
    1. Exceto os casos de manter a utilização das filas que já apontam para exchanges do tipo header e possuem 'n' suscribers
  4. Não quebrará a integração com outros produtos da suíte logística, nem com a infra estrutura de webhooks utilizado para integração com específicos e produtos externos;
  5. Ao fim da migração, o time estará com a mensageria pronta para futura migração do TJF 4.

Informações relevantes

Reorganização das exchanges e filas.

Atualmente, boa parte dos serviços que publicam mensagens o fazem através de uma exchange do tipo TOPIC. Para receber mensagens de uma exchange, boa parte dos serviços o fazem através de um único bind com uma fila que representa o serviço (exemplo agendamento-exchange.yms-core). Esta abordagem é bastante simples, contudo é muito sensível a altos volumes de mensagens de tipos específico. Exemplo, o serviço recebe mensagens de vários tipos, ClienteCriadoEvent, FornecedorCriadoEvent, MotoristaCriadoEvent entre outros. Em um determinado momento, passa a receber um alto volume da mensagem ClienteCriadoEvent, como há somente uma única fila, as demais mensagens se misturam na fila esperando para serem consumidas. Conclusão, a alta carga de mensagens de um único tipo acaba impactando o desempenho de todo o serviço.

Para endereçar esta situação, vamos criar novos exchanges do tipo HEADER, estes exchanges serão utilizados no lugar das exchanges do tipo TOPIC. Exchanges do tipo HEADER ofereçam uma maior flexibilidade na forma de realizar bind para filas em relação as exchanges do tipo TOPIC. Uma vez que tenhamos as exchanges do tipo HEADER, ao invés de criarmos uma única fila para todo o serviço,  criaremos novas filas com base em cada agregado do serviço ou alguma outra separação que o serviço julgar interessante para evitar que eventos específicos impactem o funcionamento geral do serviço. Desta forma, o filtro para entregar mensagens de uma exchange para uma fila passa a ficar no RabbitMQ.

Quando não há necessidade de reorganização das exchanges e filas 
Âncora
SemReorganizacao
SemReorganizacao

Para os serviços que já possuem exchanges do tipo HEADER e filas organizadas conforme algum critério previamente estruturado e que esteja atendendo as necessidades atuais, boa parte desta documentação é irrelevante bastando seguir os passos abaixo:

  • Redeclarar as exchanges e filas conforme novo modelo
    • Importante! Há dois formatos de gerenciamento de filas de erros, a nativa do RabbitMQ e a do TJF, utilizar a do TJF. A nativa requer a recriação da fila.
  • Ajustar os publicadores para fazer uso do RabbitTemplate.
  • Ajustar os consumidores substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler 
  • Os passos mencionados acima estão nesta documentação. Atentar-se também para as limitações mencionadas abaixo

Limitações

  1. Se a fila já faz bind com uma exchange do tipo HEADER, deseja-se manter a fila existente e há várias classes do tipo subscriber que apontam para a mesma fila
    1. Será necessário realizar o processo de migração de todos subscribers da mesma fila de uma única vez
      1. Isso porque todos os métodos anotados com @RabbitHandler de uma mesma fila precisam estar na mesma classe.
        1. Estamos analisando uma abordagem para evitar que isso seja necessário, permitindo manter a organização atual de classes por agregado e não por fila.
          1. Jira
            serverJIRA
            serverId0c783de1-186e-383b-975c-a1acd7d76cb5
            keyAHUBLOG-205
  2. Quando há um mesmo evento com dois subscribers
    1. Para estes casos, será necessário 
      1. Criar duas filas diferentes com o mesmo filtro por type. Exemplo, há dois subscribers para um evento ProcessoFinalizadoEvent onde cada subscriber corresponde a um agregado diferente. Neste caso, deverá ser criada uma fila para cada agregado.

        Bloco de código
        languagejava
        themeMidnight
        titleStreamListener
        linenumberstrue
        collapsetrue
        @Component
        public class PedidoSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        @Component
        public class EstoqueSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        Bloco de código
        languagejava
        themeMidnight
        titleRabbitListener
        linenumberstrue
        collapsetrue
        @RabbitListener(queues = "fila.pedido")
        public class PedidoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // lógica
            }
        }
        
        @RabbitListener(queues = "fila.estoque")
        public class EstoqueSubscriber{
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // lógica
            }
        }
        
        @RabbitListener(queues = "fila.agregado")
        public class AgregadoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoAgregadoCommand event) {
                // lógica
            }
        }
      2. Criar uma etapa após o ProcessoFinalizadoEvent que será responsável por chamar os diferentes comandos para cada agregado. Nesta abordagem também seria necessário disponibilizar os comandos via mensageria. Exemplo:

        Bloco de código
        languagejava
        themeMidnight
        titleStreamListener
        linenumberstrue
        collapsetrue
        @Component
        public class PedidoSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        @Component
        public class EstoqueSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        
        Bloco de código
        languagejava
        themeMidnight
        titleRabbitListener
        linenumberstrue
        collapsetrue
        @RabbitListener(queues = "fila.processo")
        public class ProcessoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // publicar FinalizarProcessoPedidoCommand
                // publicar FinalizarProcessoEstoqueCommand
                // publicar FinalizarProcessoAgregadoCommand
            }
        }
        
        @RabbitListener(queues = "fila.processo")
        public class PedidoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoPedidoCommand event) {
                // processar FinalizarProcessoPedidoCommand
            }
        }
        
        @RabbitListener(queues = "fila.processo")
        public class EstoqueSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoEstoqueCommand event) {
                // processar FinalizarProcessoEstoqueCommand
            }
        }
        
        @RabbitListener(queues = "fila.processo")
        public class AgregadoSubscriber{
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoAgregadoCommand event) {
                // processar FinalizarProcessoAgregadoCommand
            }
        }
      3. Outra possibilidade é chamar diretamente os comandos de aplicação em um único subscriber, contudo esta opção altera o formato de execução atual, fazendo com que o segundo comando de aplicação só seja executado após o sucesso do primeiro. Sendo assim, tenha consciência de que não será gerado um impacto indesejado ao produto.

        Bloco de código
        languagejava
        themeMidnight
        titleStreamListener
        linenumberstrue
        collapsetrue
        @Component
        public class PedidoSubscriber {
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        @Component
        public class EstoqueSubscriber {	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        
        Bloco de código
        languagejava
        themeMidnight
        titleRabbitListener
        linenumberstrue
        collapsetrue
        @RabbitListener(queues = "fila.processo")
        public class ProcessoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // lógica 01 -> executar FinalizarProcessoPedidoCommand
                // lógica 02 -> executar FinalizarProcessoEstoqueCommand
            }
        }
  3. Quando há necessidade de realizar um filtro com base em informações do corpo da mensagem
    1. Neste caso o filtro que era feito através do parâmetro condition da anotação StreamListener terá que ser movido para o corpo do método através de uma cláusula condicional (if).  Exemplo:

      Bloco de código
      languagejava
      themeMidnight
      titleStreamListener
      linenumberstrue
      collapsetrue
      public class SubscriberStreamListener {
      	public static final String NAME = "ProcessoExecutadoEvent";
      	public static final String CONDITIONAL_EXPRESSION_PROCESSO = 
      			" && (@MessageFilter.get(payload,'content/origem') == 'MKPL'" + 
      			 " || @MessageFilter.get(payload,'content/origem') == 'PLDT')";
      	public static final String CONDITIONAL_EXPRESSION = "headers['type']=='" + NAME + "'" + CONDITIONAL_EXPRESSION_PROCESSO;
      
      	@StreamListener(target = Channel.PROCESSO_INPUT, condition = CONDITIONAL_EXPRESSION)
      	public void processoExecutado(TOTVSMessage<ProcessoExecutadoEvent> message) {
      		// processamento do evento
      	}
      }
      
      
      Bloco de código
      languagejava
      themeMidnight
      titleRabbitListener
      linenumberstrue
      collapsetrue
      @RabbitListener(queues = "fila.processo")
      public class SubscriberAMQP {
      	
      	private static final String ORIGEM_MKPL = "MKPL";
      	private static final String ORIGEM_PLDT = "PLDT";
      	
      	@RabbitHandler
          public void whenProcessoFinalizado(@Payload ProcessoExecutadoEvent event) {
              if (ORIGEM_MKPL.equals(event.getOrigem()) || ORIGEM_PLDT.equals(event.getOrigem())) {
          		// processamento do evento
              }
          }
      }
      
      

Plano de migração para exchanges do tipo TOPIC

Visão geral dos procedimentos

Será criada uma nova exchange do tipo HEADER para cada exchange TOPIC, adicionando um bind de exchange para exchange, partindo da TOPIC para HEADER. Assim conseguimos continuar publicando na exchange original, sem quebrar as integrações existentes e, criar as novas filas já com bind para a exchange do tipo HEADER.

...

A seção abaixo ilustra o passo a passo.

Passo a passo dos procedimentos

Atualmente

Exchanges do tipo TOPIC e serviços com sua respectiva fila recebendo todos os tipos de eventos.

Baixe a animação em MP4

Primeira etapa do processo de expansão 
Âncora
Animacao01
Animacao01

Adicionar dependência ao sdk de migração que verifica se deve ou não adicionar header de versão (sl-version). Além do header, os interceptors nos subscribers também são adicionadospara garantir que somente um dos métodos seja executado (com StreamListener ou com RabbitListener). Neste momento só existirão no código os métodos com StreamListener. Como o sl-version ainda não será enviado, o método com StreamListener continuará sendo executado em todos os casos sempre.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Segunda etapa do processo de expansão 
Âncora
Animacao02
Animacao02

Nova exchange do tipo HEADER, bind entre as exchanges do tipo TOPIC e HEADER e duplicação do(s) subscribers, substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler

Aviso
titlePara os novos subscribers do RAC, TPD e outros sistemas externos que não controlamos a publicação de mensagens, adicionar anotação @IgnoreMessageInterceptor para que os interceptors criados para migração na sdk sejam ignorados.

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Terceira etapa do processo de expansão 
Âncora
Animacao03
Animacao03

Adicionar a anotação @VersionedEvent nos eventos para que sejam publicados com o header sl-version=2.

Aviso
titleTODOS os consumidores (em qualquer serviço) dos eventos a serem anotados com @VersionedEvent devem ter ao menos duplicado seus subscribers (segunda etapa do processo de expansão)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Primeira etapa do processo de contração 
Âncora
Animacao04
Animacao04

Remover os subscribers anotados com @StreamListener 

Aviso
titleTODOS os publicadores dos subscribers anotados com @StreamListener a serem removidos devem estar publicando o cabeçalho sl-version = 2 (terceira etapa de expansão)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Segunda etapa do processo de contração 
Âncora
Animacao05
Animacao05

Desabilitar os interceptors.

Aviso
titleTODOS subscribers anotados com @StreamListener já devem ter sido removidos (primeira etapa do processo de contração)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Terceira etapa do processo de contração 
Âncora
Animacao06
Animacao06

Remover a anotação @VersionedEvent dos eventos. 

Aviso
titleTODOS os consumidores (em todos serviços) dos eventos com anotação @VersionedEvent removido já devem ter removido os subscribers antigos (segunda etapa do processo de contração)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Passo a passo técnico

1 - Dependências 
Âncora
Passo01
Passo01

Alterar a versão do TJF para 3.24.0-SNAPSHOT. 

  • Esta versão SNAPSHOT será convertida em RELEASE, processo em andamento com TJF Luciano De Araujo 
Bloco de código
languagetext
themeMidnight
titletjf 3.24.0
linenumberstrue
    <parent>
        <groupId>com.totvs.tjf</groupId>
        <artifactId>tjf-boot-starter</artifactId>
        <version>3.24.0-SNAPSHOT</version>
    </parent>

Adicionar nova dependência do TJF para tratamento de mensagens

Bloco de código
languagetext
themeMidnight
titletjf-messaging-amqp
linenumberstrue
        <dependency>
            <groupId>com.totvs.tjf</groupId>
            <artifactId>tjf-messaging-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>com.totvs.tjf</groupId>
            <artifactId>tjf-test-amqp</artifactId>
            <scope>test</scope>
        </dependency>

Adicionar dependência do SDK de migração com infraestrutura para controlar publicação e recepção das mensagens

O sdk disponibiliza uma função para verificar se deve ou não adicionar o header sl-version com valor 2. Também disponibiliza a anotação @VersionedEventpara indicar se o evento deve ser publicado com sl-version com valor 2.

...

Bloco de código
languagetext
themeMidnight
titlesdk de migração
linenumberstrue
        <repository>
            <id>SuiteLogistica</id>
            <url>https://pkgs.dev.azure.com/totvstfs/SuiteLogistica/_packaging/SuiteLogistica/maven/v1</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>

        <dependency>
            <groupId>com.totvs.sl.migracao.amqp.sdk</groupId>
            <artifactId>migracao.amqp.sdk</artifactId>
            <version>1.0.7-RELEASE</version>
        </dependency>

Alterar publicadores 

1.1 - Registrar exchanges no application.yml

Obs.: Registrar todos os exchanges existentes no serviço conforme novo modelo

...

Bloco de código
languageyml
themeMidnight
titleYaml com exchanges existentes
linenumberstrue
tjf:
  messaging:
    amqp:
      exchanges: 
        topicExchanges:  #Existentes do tipo topic
          - name: coleta-entrega-exchange 
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors 
            durable: true
            autoDelete: false
        headersExchanges: #Existentes do tipo headers
          - name: coleta-entrega-header-exchange 
            durable: true
            autoDelete: false

1.2 - Alterar dispatchers

Novo formato de plublicação de mensagens.

...

Aviso
titleImportante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

2 - Criar nova exchange, novas filas e subscribers 
Âncora
Passo02
Passo02

Novo exchange do tipo HEADER

No caso das exchanges TOPIC, deverá ser criada uma exchange HEADER e será feito o bind entre elas, conforme mencionado anteriormente.

Bloco de código
languageyml
themeMidnight
titleYaml com novo exchange
linenumberstrue
tjf:
  messaging:
    amqp: 
      error: #NOVO, ADICIONAR
        exchange: coleta-entrega-query-errors # Aponta no tjf a exchange de erros - dlq
      migration: #NOVO, ADICIONAR
        interceptors: 
          enabled: true # Habilita o uso dos interceptors durante o processo de migração
      exchanges: 
        topicExchanges:  
          - name: coleta-entrega-exchange 
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors 
            durable: true
            autoDelete: false
        headersExchanges: 
          - name: coleta-entrega-header-exchange 
            durable: true
            autoDelete: false
      bindings: #NOVO, ADICIONAR
        bindingsList:
          - destination: coleta-entrega-header-exchange  #NOVA EXCHANGE DO TIPO HEADER
            exchange: coleta-entrega-exchange  #EXCHANGE EXISTENTE DO TIPO TOPIC
            routingKey: "#"  
            destinationType: exchange

Novas filas

Neste exemplo novas filas serão criadas:

Bloco de código
languageyml
themeMidnight
titleYaml com novas filas
linenumberstrue
tjf:
  messaging:
    amqp:
      error:
        exchange: coleta-entrega-query-errors 
      migration:
        interceptors: 
          enabled: true 
      exchanges: 
        topicExchanges: 
          - name: coleta-entrega-exchange 
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors 
            durable: true
            autoDelete: false
        headersExchanges:
          - name: coleta-entrega-header-exchange 
            durable: true
            autoDelete: false
      queues: #NOVO, ADICIONAR
        queuesList:         
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Cria a nova queue do domínio de arquivo - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Cria a nova queue do domínio de unidade - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Cria a nova queue do domínio de organizacao - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.video # Cria a nova queue do domínio de video - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors.coleta-entrega-query-errors # Cria a nova queue de erros - dlq
            durable: true
            autoDelete: false
            exclusive: false
            queue-dlq: coleta-entrega-query-errors.coleta-entrega-query-errors # Define a queue de erros como dlq no tjf
      bindings: #ATUALIZADO, ALTERAR 
        bindingsList:
          - destination: coleta-entrega-header-exchange
            exchange: coleta-entrega-exchange
            routingKey: "#"  
            destinationType: exchange
          - destination: coleta-entrega-query-errors.coleta-entrega-query-errors # Novo, faz o binding da fila de erros - dlq
            exchange: coleta-entrega-query-errors
            destinationType: queue
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Novo, faz o binding dos eventos de arquivo
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: [ArquivoCriadoEvent, ArquivoRemovidoEvent] #UMA DAS POSSIVEIS FORMAS DE DECLARAR UMA LISTA DE EVENTOS
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Novo, faz o binding do evento de unidade
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: UnidadeCriadaEvent
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Novo, faz o binding do evento de organizacao
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: OrganizacaoCriadaEvent
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.video # Novo, faz o binding dos eventos de video
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: # OUTRA FORMA DE DECLARAR UMA LISTA DE EVENTOS
				- VideoHowToCriadoEvent
				- VideoHowToRemovidoEvent

Novos subscribers

Primeiramente deve-se fazer uma cópia da classe subscriber original e nomear a atual como old.

...

Bloco de código
languagejava
themeMidnight
titleRabbitListener
linenumberstrue
@Component
@AllArgsConstructor
@RabbitListener(queues = "coleta-entrega-header-exchange.sl-coleta-entrega.arquivo")
public class ArquivoSubscriber {
	private final ArquivoService service;

	@RabbitHandler
	public void arquivoCriado(@Payload ArquivoCriadoEvent message) {
		service.on(message);
	}

	@RabbitHandler
	public void arquivoRemovido(@Payload ArquivoRemovidoEvent message) {
		service.on(message);
	}
}

// TESTES 
@DisplayName("Arquivo Header - Mensageria")
public class ArquivoSubscriberIT extends AdapterConfigIT {
	@MockBean
	private ArquivoService service;

	@Captor
	private ArgumentCaptor<ArquivoCriadoEvent> createCaptor;
	@Captor
	private ArgumentCaptor<ArquivoRemovidoEvent> removeCaptor;

	@Test
	@DisplayName("Deve manipular mensagem recebido por mensageria")
	void deveManipularArquivoCriadoEvent() throws Exception {
		var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class);
		var listener = new MockListener(subscriber);    

        var event = mockArquivoCriadoEvent();
        var message = TOTVSMessageBuilder
             .withType(ArquivoCriadoEvent.NAME)
             .setContent(event)
             .buildAmqp();

        listener.sendMessage(message);

        verify(subscriber).arquivoCriado(createCaptor.capture());
        assertInstanceOf(ArquivoCriadoEvent.class, createCaptor.getValue()); 
    }

	@Test
	@DisplayName("Deve manipular mensagem recebido por mensageria")
	void deveManipularArquivoRemovidoEvent() throws Exception {
		var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class);
		var listener = new MockListener(subscriber);    

        var arquivo = ArquivoTestFactory.umArquivo();
        var event = ArquivoRemovidoEvent.of(arquivo.getId()); 
        var message = TOTVSMessageBuilder
             .withType(ArquivoRemovidoEvent.NAME)
             .setContent(event).buildAmqp();

        listener.sendMessage(message);

        verify(subscriber).arquivoRemovido(removeCaptor.capture());
        assertInstanceOf(ArquivoRemovidoEvent.class, removeCaptor.getValue());  
    }

    @Test
	@DisplayName("Deve criar um arquivo recebido por mensageria")
	void deveCriarUmArquivo() {
		var subscriber = new ArquivoHeaderSubscriber(service);
		var event = mockArquivoCriadoEvent();
		subscriber.arquivoCriado(event);
        var expected = ArquivoCriadoEvent.builder()
             .id(event.getId())
             .nome(event.getNome())
             .tipoConteudo(event.getTipoConteudo())
             .removido(event.getRemovido())
             .dataCriacao(event.getDataCriacao())
             .tamanho(event.getTamanho())
             .prefixo(event.getPrefixo())
             .build();
        verify(service).on(expected);
	}

	@Test
	@DisplayName("Deve remover um arquivo recebido por mensageria")
	void deveRemoverUmArquivo() {
		var arquivo = ArquivoTestFactory.umArquivo();
		var subscriber = new ArquivoHeaderSubscriber(service);
		var event = ArquivoRemovidoEvent.of(arquivo.getId());
		subscriber.arquivoRemovido(event);
        
        var expected = ArquivoRemovidoEvent.of(event.getId());        
        verify(service).on(expected);  
    }

	private ArquivoCriadoEvent mockArquivoCriadoEvent() {
		return ArquivoCriadoEvent.builder()
				.id(UUID.randomUUID().toString())
				.nome("nome")
				.tipoConteudo("tipoConteudo")
				.removido(false)
				.dataCriacao(ZonedDateTime.now())
				.tamanho(new BigDecimal(127))
				.prefixo(TestUtils.tenantId + "/")
				.build();
	}
}

Novos subscribers para sistemas externos como RAC, TPD e outros devem conter a anotação @IgnoreMessageInterceptor 

Bloco de código
languagejava
themeMidnight
titleStreamListener
linenumberstrue
@RabbitListener(queues = "queue")
@IgnoreMessageInterceptor // Isto fará o skip do interceptors de todos os metodos da classe
public class AlgumRabbitListenerSubscriber {
}
Aviso
titleImportante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

3 - Mudar versão do evento
Âncora
Passo03
Passo03
 

No serviço que envia as mensagens:

...

Aviso
titleImportante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

4 - Remover subscriber antigo
Âncora
Passo04
Passo04

Depois que todos os pods publicadores foram substituídos e todas as mensagens da fila original foram drenadas, pode ser removida a classe subscriber antiga juntamente com suas configurações antigas existentes no yaml.

Voltar para animação

5 - Repetir passos 3-4
Âncora
Passo05
Passo05

Agora é só repetir os passos até fazer a troca de todos os subscribers.

6 - Desabilitar interceptors
Âncora
Passo06
Passo06

Mudar o valor da configuração tjf.messaging.amqp.error.migration.interceptors.enable para false.

...

Aviso
titleImportante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

7 - Remover Versionamento dos eventos e utilização do SDK de migração
Âncora
Passo07
Passo07

Depois que todos os consumidores das mensagens já foram migrados para @RabbitListener, remover a anotação @VersionedEvent, o código adicionado nos Publishers, a configuração referente ao sdk de migração do yaml (migration.interceptors.enabled) e finalmente a dependência do SDK.

...

Bloco de código
languagejava
themeMidnight
titlePublicador após contração
linenumberstrue
protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) {
     TOTVSMessageBuilder.withType(contentToBeDispatched.getClass().getSimpleName()
         .setContent(contentToBeDispatched)
         .setTransactionInfo(transactionInfo)
         .buildAmqp()
         .sendTo(rabbitTemplate, EXCHANGE, ROUTING_KEY);
}

Voltar para animação

Todos os fontes usados na POC podem ser consultados abaixo

Informações
titleProjeto usado na poc

Core: https://dev.azure.com/totvstfs/SuiteLogistica/_git/SL-ColetaEntregaCore/pullrequest/359669
Query: https://dev.azure.com/totvstfs/SuiteLogistica/_git/SL-ColetaEntregaQuery/pullrequest/359668