Почему nodejs с функцией потребления amqplib является замыканием?

Я использую модуль nodejs amqplib для подключения rabbitmq. Я обнаружил, что функция потребления стала функцией закрытия, но я не мог понять, почему. Я не использовал закрытие.

Мой код ниже. Я нашел корр в returnOK все еще получить значение в первый раз. Когда я запускаю эту функцию второй раз. Корр все еще значение в первый раз. Я думаю, что это странно. Кто-то может объяснить это?

  const corr = new Date().getTime();
  try {
    const params = JSON.stringify(req.body);
    console.log('corr first =', corr);
    await ch.sendToQueue(q, Buffer.from(params), {
      deliveryMode: true,
      correlationId: corr.toString(),
      replyTo: queue.queue,
    });

    const returnOK = (msg) => {
      if (msg.properties.correlationId === corr.toString()) {
        console.info('******* Proxy send message done *******');
        res.status(HTTPStatus.OK).json('Done');
      }
    };
    await ch.consume(queue.queue, returnOK, { noAck: true });
  } catch (error) {
    res.status(HTTPStatus.INTERNAL_SERVER_ERROR).json(error);
  }

2 ответа

Решение

Похоже, вы звоните ch.consume по каждому запросу, фактически создавая нового потребителя каждый раз. Вы должны сделать это только один раз.

То, что происходит, - то, что первый потребитель собирает сообщения.

Чтобы исправить это, вы, вероятно, хотите переместить ch.consume вне обработчика запросов.

Я вижу, что это устарело, но я также столкнулся с аналогичной проблемой при написании запроса POST, который ожидает обработки сообщения перед отправкой ответа пользователю.

Мое дело

Сначала я попытался создать новый канал для каждого пост-запроса, но это показалось не оптимальным, поэтому каким-то образом я обнаружил, что метод Channel.consume имеет свойство ConsumerTag в args, и вы можете отменить свою потребляющую подписку с помощью этого тега, вот так:

          app.post('/process', async (req, res) => {
        const requestData = req.body;
        
        const requestId = uuidv4()
    
        await sendToQueue({ id: requestId, data: requestData });
    
        const responseData = await new Promise(async (resolve) => {
            const consumerTag = `${requestId}-consumer`
            await channel.consume(
            'responses',
            (message) => {
              const data = JSON.parse(message.content.toString());
              channel.ack(message);
              channel.cancel(consumerTag);
              resolve(data);
            },
            { noAck: false, consumerTag: consumerTag }
          );
        });

        console.log("responseData", responseData)

        res.status(200).json(responseData);
    });

Для вас это будет примерно так, я думаю:

          const consumerTag = "*your tag*"
    const returnOK = (msg) => {
      if (msg.properties.correlationId === corr.toString()) {
        console.info('******* Proxy send message done *******');
        res.status(HTTPStatus.OK).json('Done');
      }
      ch.cancel(consumerTag)
    };
    await ch.consume(queue.queue, returnOK, { noAck: true, consumerTag: consumerTag});
Другие вопросы по тегам