A classe tAMQP representa um objeto de comunicação do tipo AMQP (Advanced Message Queuing Protocol) da versão 0.9.1, que se comunica com servidores RabbitMQ. A instancia de uma classe permite a comunicação, envio e recebimento de mensagens através de um servidor AMQP, sendo possível o desenvolvimento de diversos tipos de aplicações, realizando transações ou comunicações padronizadas, de forma assíncrona, indiferente da arquitetura de cada uma delas.
Essa classe foi desenvolvida com base na existente biblioteca RabbitMQ.Client desenvolvida para C# .Net, sendo inclusive possível ser usado como referencia a documentação presente em https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html, assim como montar o ambiente de um servidor AMQP em https://www.rabbitmq.com/#getstarted. Para qualquer informação sobre os conceitos de tipos de fila e comportamentos de recebimentos e envios de mensagens, favor também consultar esse site.
|
#define AMQP_AUTODELETE .T. #define AMQP_EXCLUSIVE .T. #define AMQP_DURABLE .T. #define AMQP_PERSISTENT .T. #define AMQP_AUTOACK .T. #define fixed_channel_id 1 User Function _sender Local oSender := tAmqp():New("localhost",5672,"guest","guest",fixed_channel_id) oSender:QueueDeclare("test_queue", .F.,.F.,AMQP_AUTODELETE ) oSender:BasicPublish("","test_queue", AMQP_PERSISTENT, "hello world!") u___Receiver() Freeobj(oSender) Return User Function _Receiver Local oRecv := tAmqp():New("localhost",5672,"guest","guest",fixed_channel_id) local var oRecv:QueueDeclare("test_queue", .F.,.F.,AMQP_AUTODELETE) oRecv:BasicConsume("test_queue", AMQP_AUTOACK, ) var := oRecv:Body Freeobj(oRecv) Return |
17.3.0.x
Cria um objeto tAMQP com um determinado AMQP Server.
tAmqp():New( < uParam1 >, [ uParam2 ], [ uParam3 ], [ uParam4 ], [ uParam5 ], [ uParam6 ]) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
uParam1 | caractere | Endereço do AMQP Server | X | |
uParam2 | numérico | Porta do AMQP Server | ||
uParam3 | caractere | Usuário para logar na fila do AMQP Server | ||
uParam4 | caractere | Senha para logar na fila do AMQP Server | ||
uParam5 | caractere | Canal da comunicação com o AMQP Server | ||
uParam6 | caractere | Virtual Host (vhost) no servidor AMQP (padrão: "/") |
Nome | Tipo | Descrição |
---|---|---|
oObj | objeto | Nova instância da classe tAmqp |
"/"
)Local oClient := tAmqp():New("localhost",5672,"guest","guest",1) |
Local oClient := tAmqp():New("localhost", 5672, "guest", "guest", 1, "meu_vhost") |
A funcionalidade de VHosts está disponível a partir da versão 24.3.0.6 do Application Server. |
Indica qual canal esta sendo usado para a fila atual. Os canais são "planos privados" dentro da mesma conexão TCP”. Ou seja, é necessária apenas uma instância de conexão na mesma porta, e podendo usar varias "subportas" sendo completamente independente de um canal para o outro.
Tipo | Valor Padrão | Somente Leitura |
---|---|---|
numérico | N/A | N |
Conteudo da mensagem recebida após uma solicitação ao server AMQP via BasicConsume().
Tipo | Valor Padrão | Somente Leitura |
---|---|---|
caractere | "" | N |
Indica qual o timeout atual que esta sendo usado para a comunicação de um consumidor de uma exchange.
Tipo | Valor Padrão | Somente Leitura |
---|---|---|
numérico | 5 | N |
Cria uma nova fila no AMQP Server.
QueueDeclare( [ cFila ], [ bisDurable ], [ bisExclusive ], [ bisAutodelete ], [bisPassive] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cFila | caractere | Indica o nome da fila onde será criada. | ||
bisDurable | lógico | Indica que as mensagens serão guardadas mesmo se o servidor for reiniciado ou desligado, mantendo o estado da fila será mantido. Caso seja falso, as mensagens não serão recriadas caso o servidor seja reiniciado ou desligado. | ||
bisExclusive | lógico | Indica que a fila será exclusiva a um único par producer/consumer. | ||
bisAutodelete | lógico | Indica que, ao termino do producer enviar com sucesso a mensagem até o producer, ele irá se deletar. | ||
bisPassive | lógico | Indica que a função irá apenas verificar se existe uma fila com o nome indicado no AMQP Server, caso exista poderá ser verificar com "Status()", caso não exista não sera criado uma. |
oSender:QueueDeclare(cFila,bisDurable,bisExclusive,bisAutodelete ) |
Cria uma nova exchange no AMQP Server.
ExchangeDeclare( [ cexchange ], [ ctype ], [ bisPassive ], [ bisDurable ], [ bisAutodelete ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cexchange | caractere | Indica o nome da exchange | X | |
ctype | carectere | Indica o tipo da fila (fanout, direct, topic). * | X | |
bisPassive | lógico | Indica que a função irá apenas verificar se existe uma exchange com o nome indicado no AMQP Server, caso exista poderá ser verificar com "Status()", caso não exista não sera criado uma. | ||
bisDurable | lógico | Indica que as mensagens serão guardadas mesmo se o servidor for reiniciado ou desligado, mantendo o estado da fila será mantido. Caso seja falso, as mensagens não serão recriadas caso o servidor seja reiniciado ou desligado. | ||
bisAutodelete | lógico | Indica que a mensagem irá automaticamente ser deletada na primeira tentativa de resgate pelo consumer. |
oSender:ExchangeDeclare(cexchange,"fanout",.F.,.F. ) |
Resgata uma mensagem no AMQP Server.
BasicConsume( < cFila >, < bAck >, < bWaitingEvent > ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cFila | caractere | Indica o nome da fila onde será resgatada. | X | |
bAck | lógico | Indica se o consumo irá automaticamente se marcar como entregue, caso contrario, o consumer deverá indicar que a mensagem foi entregue com o método :BasicAck(). | X | |
bWaitingEvent | lógico | Ignora o timeout e se fica aguardando por uma nova mensagem na fila(no maximo por 30 segundos). | X |
oRecv:BasicConsume("test_queue", bAck, bWaitingEvent) |
Liga uma fila a uma exchange para que as mensagens fluam (sujeitas a vários critérios) da exchange (origem) para a fila (destino).
QueueBind( < cQueue >, < cExchange >, [ croutingkey ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cQueue | caractere | Indica o nome da fila. | X | |
cExchange | caractere | Indica o nome da exchange. | X | |
croutingkey | caractere | Determina um endereço virtual que o exchange pode usar para encaminhar a mensagem para a fila. |
oRecv:BasicConsume("test_queue", bAck, bWaitingEvent) |
Envia uma mensagem para o AMQP Server.
BasicPublish( < cExchange >, < cRoutingKey >, [ nPERSISTENT ], [ cMsg ], [ correlationID ], [ ReplyTo ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cExchange | caractere | Indica o nome da exchage onde será enviada a mensagem. | X | |
cRoutingKey | caractere | Indica a classificação de onde será enviada a mensagem (fila, rota, etc.) na exchange. | X | |
nPERSISTENT | lógico | Indica que a requisição será persistente. | ||
cMsg | caractere | Informa a mensagem a ser postada. | ||
correlationID | caractere | Id de correlação | ||
ReplyTo | caractere | Fila para resposta dessa mensagem |
oRecv:BasicPublish("test_exchange", "test_queue", AMQP_PERSISTENT, "Hello World!" ) |
Como as mensagens são enviadas (enviadas) para os clientes de forma assíncrona, geralmente há mais de uma mensagem "em trânsito" em um canal a qualquer momento. Com essa função, seta para a conexão atual um limite dessas mensagens a serem processadas (unacknowledged) pelo consumer.
BasicQos( < nprefetchSize >, < nprefetchCount >, < bglobal > ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
nprefetchSize | numérico | Indica o limite do tamanho das mensagens que poderão ficar em aguardo na atual exchange. | X | |
nprefetchCount | numérico | Indica o numero de mensagens que poderão ficar em aguardo na atual exchange. | X | |
bglobal | lógico | Indica se a configuração atual será global ou não. | X |
oRecv:BasicQos(cprefetchSize, cprefetchCount, bglobal) |
Indica para a fila que voce recebeu e processou a mensagem com sucesso (acknowledge)
BasicAck( < cDeliverytag>, < bMultiple>) |
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cDeliverytag | caractere | Indica o nome da tag da(s) mensagem(s) que será marcadada como Ack(recebidas). | X | |
bMultiple | lógico | Indica que varias mensagens serão setadas como Ack simultaneamente. | X |
oRecv:BasicAck(ctag, bmultiple) |
Indica qual a "apelido" (correlação) da mensagem recebida
CorrelationID() |
oRecv:CorrelationID() |
Indica quantas mensagens tem na fila prontas para serem recebidas
oRecv:MessageCount() |
oRecv:MessageCount() |
Indica quantas consumers existem em uma determinada fila.
oRecv:ConsumerCount() |
oRecv:ConsumerCount() |
Descreve o erro da ultima operação realizada.
Error() |
oRecv:Error() |
Retorna qual o nome da fila atual que esta sendo usado
QueueName() |
Informa qual a deve ser a fila solicitada para a resposta da mensagem recebida
ReplyTo() |
oRecv:ReplyTo() |
Retorna o código de erro da ultima operação realizada
Status() |
oRecv:Status() |
Informa qual tag foi associada a uma fila, se houver
Tag() |
oRecv:Tag() |