Многократная обработка запланированных заданий с помощью kue в режиме кластера

Я настроил kue для запуска с модулем Cluster, который порождает дочерний процесс kue для каждого доступного ядра процессора..

Когда планировщик вставляет every ключи к Redis, кажется, все в порядке - только один набор ключей для каждого every запустить.

Однако, когда наступает время для запуска обработки задания, все дочерние процессы (рабочие) начинают обрабатывать логику обработки, в результате чего задание имеет несколько экземпляров, запускаемых из одной записи "планировщика".

Этот симптом, по-видимому, не возникает при программном запуске новой работы в kueи при использовании kue API для этого.

Пожалуйста, порекомендуйте.

Основной код начальной загрузки

var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    require('./init.js');

    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
    cluster.on('online', function (worker) {
        console.log('Worker ' + worker.process.pid + ' is online');
    });

    cluster.on('exit', function (worker, code, signal) {
        console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        console.log('Starting a new worker');
        cluster.fork();
    });

    require('./scheduler.js');
} else {
    require("./job_types");
}

init.js

/// Module Dependency
var cors = require('cors');
var kue = require('kue-scheduler');
var express = require('express');
var bodyParser = require('body-parser');
var config = require('./configs/config');
var redis = require("redis");

var client = redis.createClient();
var jobs = kue.createQueue();

require('./routes');

// Clearing redis for clean startup
console.log('Clearing old Redis data...');
client.flushall();

/// Webserver
var corsOptions = {origin: '*'};

var app = express();
app.use(cors(corsOptions));
app.options('*', cors(corsOptions));
app.use(bodyParser.json());
app.use(kue.app);
app.listen(config.env.port, function () {
    var host = config.env.host;
    var port = config.env.port;

    console.log('[' + process.pid + '] Monitoring kue listening at http://%s:%s', host, port);
});

// Handling safe shutdown
process.once('SIGTERM', function (sig) {
    kue.shutdown(5000, function (err) {
        console.log('[' + process.pid + '] Kue shutdown: ', err || '');
        process.exit(0);
    });
});

process.on('uncaughtException', function (err) {
    console.log('[' + process.pid + '] ' + err);
    console.log('[' + process.pid + '] ' + err.stack);
});

scheduler.js

var scheduler = require('kue-scheduler');
var q = scheduler.createQueue();

// Set specific job scheduling here
q.every('1 minutes', q.createJob('getSocialEntities').attempts(3).priority('normal'));

// General scheduler event handling
// Uncomment for debug
q.on('already scheduled', function (job) {
    console.log('[' + process.pid + '] job is already scheduled: ' + job.type + ' (' + job.id + ')');
});

q.on('schedule success', function (job) {
    console.log('[' + process.pid + '] job scheduled: ' + job.type + ' (' + job.id + ')');
});

q.on('schedule error', function (error) {
    console.error('[' + process.pid + '] failed scheduling job');
    console.error(error);
});
  • job_types - содержит куе jobs.process('job type',...) методы для обработки всей логики работы.

0 ответов

Другие вопросы по тегам