Как отделить сердцебиение от собственного потока для соединения RabbitMQ
У меня есть процесс, который использует RabbitMQ и NodeJS для обработки изображений. В связи с интенсивной задачей, я думаю, что у меня та же проблема, что и ссылка здесь https://github.com/squaremo/amqp.node/issues/261
Я пытаюсь выяснить, как реализовать последний комментарий по этому вопросу.
"Да. NodeJS является однопоточным, и если вы используете этот поток, чтобы делать что-то в течение длительного времени, больше ничего не произойдет. Как предполагает @michaelklishin, известное решение этой общей проблемы - использование дочернего процесса или модуля кластера.".
РЕДАКТИРОВАТЬ:
Я обновил код ниже с примером того, как я думаю, что я могу сделать это с помощью модуля amqp-connection-manager. Прямо сейчас я использую глобальную переменную для хранения фактического сообщения, чтобы иметь возможность подтверждения. Я предполагаю, что есть лучший способ сделать это.
//Used to be an example for how to keep the connection thread and the working thread separate
//in order to fix the issue of missing heartbeat intervals due to processing on the same thread
const cluster = require('cluster');
var amqp = require('amqp-connection-manager');
var config = require('./config.json');
var working_queue = "Test_Queue";
//DONT REALLY DO THIS
var rabbit_msg_data;
//******* CLUSTER SETUP *******
// This will spawn off the number of worker for this process.
if (cluster.isMaster) {
console.log("Master "+process.pid+" is running");
worker = cluster.fork();
cluster.on('exit', (worker, code, signal) => {
if(signal)
{
console.log("worker was killed by signal: "+signal);
console.log(worker);
}
else if (code !== 0)
{
console.log("worker exited with error code: "+code);
console.log(worker);
}
else
{
console.log("Worker "+worker.process.pid+" exited successfully");
console.log(worker);
//Not sure if this works this way or if I need to put this worker into variables
}
});
//testing sending a message back and forth
// setTimeout(function() {
// worker.send("I got a request!");
// }, 1000);
//******** RABBIT MQ CONNECTION **********
// Create a connection manager to rabbitmq
var connection = amqp.connect(config.rabbit_connections_arr, {json: true, heartbeatIntervalInSeconds: 2});
connection.on('connect', function() {
console.log('Connected to rabbitmq');
});
connection.on('disconnect', function(params) {
console.log('Disconnected from rabbitmq:', params.err.stack);
});
// Set up a channel listening for messages in the queue.
var channelWrapper_listening = connection.createChannel({
setup: function(channel) {
// `channel` here is a regular amqplib `ConfirmChannel`.
return Promise.all([
channel.assertQueue(working_queue, {durable: true}),
channel.prefetch(1),
channel.consume(working_queue, function(data){
rabbit_msg_data = data;
worker.send(data.content.toString());
}, requeue = false)
]);
}
});
worker.on('message', function(msg){
// console.log("Worker to Master (ack): ", msg.content.toString());
console.log("Worker to Master (ack): ", msg);
//console.log(msg.content.toString());
channelWrapper_listening.ack(rabbit_msg_data);
});
}
else //All worker processes (MAIN LOGIC)
{
console.log("Worker "+process.pid+" started");
process.on('message',function(msg){
console.log("Master to Worker (working): ", msg);
//send msg back when done working on it.
setTimeout(function() {
process.send(msg);
}, 5000);
});
}