Закрытие соединения обещания amqp после публикации?

Я пытаюсь выяснить, как закрыть свои обещания соединений после публикации сообщений.

Я попытался экстраполировать общий код для моего отправителя и получателя, поэтому у меня есть файл подключения, подобный этому:

connector.js

const amqp = require('amqplib');
class Connector {
  constructor(RabbitMQUrl) {
    this.rabbitMQUrl = RabbitMQUrl;
  }

  connect() {
    return amqp.connect(this.rabbitMQUrl)
      .then((connection) => {
        this.connection = connection;
        process.once('SIGINT', () => {
          this.connection.close();
        });
        return this.connection.createChannel();
      })
      .catch( (err) => {
        console.error('Errrored here');
        console.error(err);
      });
  }
}

module.exports = new Connector(
  `amqp://${process.env.AMQP_HOST}:5672`
);

Тогда мой издатель / отправитель выглядит так:

publisher.js

const connector = require('./connector');

class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }

  publish(msg) {
    connector.connect()
      .then( (channel) => {
        let ok = channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        );
        return ok
          .then( () => {
            channel.publish(this.exchange, '', Buffer.from(msg));
            return channel.close();
          })
          .catch( (err) => {
            console.error(err);
          });
      });
  }
}

module.exports = new Publisher(
  process.env.AMQP_EXCHANGE,
  process.env.AMQP_TOPIC
);

Но, как я уже сказал, я не могу понять, как закрыть соединение после звонка. publish(),

1 ответ

Вы можете добавить функцию close() к соединителю:

  close() {
      if (this.connection) {
          console.log('Connector: Closing connection..');
          this.connection.close();
      }
  }

Издательство:

class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }

  connect() {
      return connector.connect().then( (channel) => {
         console.log('Connecting..');
         return channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        ).then (() => {  
            this.channel = channel;
            return Promise.resolve();
        }).catch( (err) => {
            console.error(err);
        });;
      });
  }

  disconnect() {
      return this.channel.close().then( () => { return connector.close();});
  }

  publish(msg) {
      this.channel.publish(this.exchange, '', Buffer.from(msg));      
  };

}

Test.js

'use strict'

const connector = require('./connector');
const publisher = require('./publisher');


publisher.connect().then(() => { 
    publisher.publish('message');
    publisher.publish('message2');
    publisher.disconnect();
});
Другие вопросы по тегам