Почему nodejs с функцией потребления amqplib является замыканием?
Я использую модуль nodejs amqplib для подключения rabbitmq. Я обнаружил, что функция потребления стала функцией закрытия, но я не мог понять, почему. Я не использовал закрытие.
Мой код ниже. Я нашел корр в returnOK все еще получить значение в первый раз. Когда я запускаю эту функцию второй раз. Корр все еще значение в первый раз. Я думаю, что это странно. Кто-то может объяснить это?
const corr = new Date().getTime();
try {
const params = JSON.stringify(req.body);
console.log('corr first =', corr);
await ch.sendToQueue(q, Buffer.from(params), {
deliveryMode: true,
correlationId: corr.toString(),
replyTo: queue.queue,
});
const returnOK = (msg) => {
if (msg.properties.correlationId === corr.toString()) {
console.info('******* Proxy send message done *******');
res.status(HTTPStatus.OK).json('Done');
}
};
await ch.consume(queue.queue, returnOK, { noAck: true });
} catch (error) {
res.status(HTTPStatus.INTERNAL_SERVER_ERROR).json(error);
}
2 ответа
Похоже, вы звоните ch.consume
по каждому запросу, фактически создавая нового потребителя каждый раз. Вы должны сделать это только один раз.
То, что происходит, - то, что первый потребитель собирает сообщения.
Чтобы исправить это, вы, вероятно, хотите переместить ch.consume
вне обработчика запросов.
Я вижу, что это устарело, но я также столкнулся с аналогичной проблемой при написании запроса POST, который ожидает обработки сообщения перед отправкой ответа пользователю.
Мое дело
Сначала я попытался создать новый канал для каждого пост-запроса, но это показалось не оптимальным, поэтому каким-то образом я обнаружил, что метод Channel.consume имеет свойство ConsumerTag в args, и вы можете отменить свою потребляющую подписку с помощью этого тега, вот так:
app.post('/process', async (req, res) => {
const requestData = req.body;
const requestId = uuidv4()
await sendToQueue({ id: requestId, data: requestData });
const responseData = await new Promise(async (resolve) => {
const consumerTag = `${requestId}-consumer`
await channel.consume(
'responses',
(message) => {
const data = JSON.parse(message.content.toString());
channel.ack(message);
channel.cancel(consumerTag);
resolve(data);
},
{ noAck: false, consumerTag: consumerTag }
);
});
console.log("responseData", responseData)
res.status(200).json(responseData);
});
Для вас это будет примерно так, я думаю:
const consumerTag = "*your tag*"
const returnOK = (msg) => {
if (msg.properties.correlationId === corr.toString()) {
console.info('******* Proxy send message done *******');
res.status(HTTPStatus.OK).json('Done');
}
ch.cancel(consumerTag)
};
await ch.consume(queue.queue, returnOK, { noAck: true, consumerTag: consumerTag});