Многократная обработка запланированных заданий с помощью kue в режиме кластера
Я настроил kue для запуска с модулем Cluster, который порождает дочерний процесс kue
для каждого доступного ядра процессора..
Когда планировщик вставляет every
ключи к Redis, кажется, все в порядке - только один набор ключей для каждого every
запустить.
Однако, когда наступает время для запуска обработки задания, все дочерние процессы (рабочие) начинают обрабатывать логику обработки, в результате чего задание имеет несколько экземпляров, запускаемых из одной записи "планировщика".
Этот симптом, по-видимому, не возникает при программном запуске новой работы в kue
и при использовании kue
API для этого.
Пожалуйста, порекомендуйте.
Основной код начальной загрузки
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
require('./init.js');
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
cluster.fork();
});
require('./scheduler.js');
} else {
require("./job_types");
}
init.js
/// Module Dependency
var cors = require('cors');
var kue = require('kue-scheduler');
var express = require('express');
var bodyParser = require('body-parser');
var config = require('./configs/config');
var redis = require("redis");
var client = redis.createClient();
var jobs = kue.createQueue();
require('./routes');
// Clearing redis for clean startup
console.log('Clearing old Redis data...');
client.flushall();
/// Webserver
var corsOptions = {origin: '*'};
var app = express();
app.use(cors(corsOptions));
app.options('*', cors(corsOptions));
app.use(bodyParser.json());
app.use(kue.app);
app.listen(config.env.port, function () {
var host = config.env.host;
var port = config.env.port;
console.log('[' + process.pid + '] Monitoring kue listening at http://%s:%s', host, port);
});
// Handling safe shutdown
process.once('SIGTERM', function (sig) {
kue.shutdown(5000, function (err) {
console.log('[' + process.pid + '] Kue shutdown: ', err || '');
process.exit(0);
});
});
process.on('uncaughtException', function (err) {
console.log('[' + process.pid + '] ' + err);
console.log('[' + process.pid + '] ' + err.stack);
});
scheduler.js
var scheduler = require('kue-scheduler');
var q = scheduler.createQueue();
// Set specific job scheduling here
q.every('1 minutes', q.createJob('getSocialEntities').attempts(3).priority('normal'));
// General scheduler event handling
// Uncomment for debug
q.on('already scheduled', function (job) {
console.log('[' + process.pid + '] job is already scheduled: ' + job.type + ' (' + job.id + ')');
});
q.on('schedule success', function (job) {
console.log('[' + process.pid + '] job scheduled: ' + job.type + ' (' + job.id + ')');
});
q.on('schedule error', function (error) {
console.error('[' + process.pid + '] failed scheduling job');
console.error(error);
});
- job_types - содержит куе
jobs.process('job type',...)
методы для обработки всей логики работы.