Как потреблять и публиковать сообщения из Amazon MQ в Nodejs?

Мне нужно использовать Amazon MQ для приема и публикации сообщений в очереди с использованием протокола amqp в Nodejs. Я уже настроил AWS MQ, определил брокера и создал очередь.

Я следовал за AWS Javascript SDK, но до сих пор не могу найти какой-либо метод для использования и публикации сообщения в очереди.

Может кто-нибудь помочь мне, как подключиться к AWS MQ по протоколу amqp, чтобы использовать и публиковать сообщения в очереди.

Спасибо

1 ответ

Я использовал модуль amqp10 npm, который работает, чтобы потреблять и публиковать сообщения в AWS MQ.

Ниже приведен код:

  const AMQPClient = require('amqp10').Client;
  const Policy = require('amqp10').Policy;

1. Потребить сообщение от AWS MQ:

      let client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, 
Policy.ServiceBusQueue));
      let connectionString = 'your_connnection_string';
      client.connect(connectionString)
          .then(function() {
              console.log("Connected");
              return Promise.all([

client.createReceiver(configurationHolder.config.getMessageQueueName)
              ]);
          })
          .spread(function(receiver) {
                  receiver.on('errorReceived', function(rx_err) {
                      console.warn('===> RX ERROR: ', rx_err);
                      return err;
                  });
                  receiver.on('message', function(message) {
                      client.disconnect().then(function() {
                      console.log('disconnected, when we get the message from the queue);
                      return message.body;
                  });
              });
          })
          .error(function(e) {
                  console.warn('connection error: ', e);
                  return err;
              });
  1. Чтобы опубликовать сообщение в AWS MQ:

    let client = new AMQPClient(Policy.merge({
        senderLinkPolicy: {
            callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
        }
    }, Policy.DefaultPolicy));
    
    
        client.connect(connectionString, {
                'saslMechanism': 'ANONYMOUS'
            })
            .then(function() {
                console.log("Connected");
                return Promise.all([
                    client.createSender(queueName)
                ]);
            })
            .spread(function(sender) {
                sender.on('errorReceived', function(tx_err) {
                    console.warn('===> TX ERROR: ', tx_err);
                    return err;
                });
                var options = {
                    annotations: {
                        'x-opt-partition-key': 'pk' + msgValue
                    }
                };
                return sender.send(JSON.stringify(msgValue), 
    options).then(function(state) {
                    client.disconnect().then(function() {
                        console.log('disconnected, when we saw the value we 
    inserted after publish to AWS MQ.');
                        return state;
                    });
                });
            })
            .error(function(e) {
                console.warn('connection error: ', e);
                return err;
            });
    

Спасибо

Другие вопросы по тегам