Kue обратный вызов, когда работа завершена

Мой основной экземпляр Node разветвляет рабочий процесс, который принимает сообщения через IPC (используя встроенный Node process.send() а также process.on('message'...) которые являются объектами, содержащими информацию о новых заданиях, добавляемых в Kue. Затем он обрабатывает эти работы.

Мой основной экземпляр Node вызывает что-то вроде этого:

worker.send({jobType:'filesystem', operation: 'delete', path: fileDir});

и рабочий экземпляр делает что-то вроде этого:

jobs.create(message.jobType, message).save();

jobs.process('filesystem', function(job, done) {
    fs.delete(job.data.path, function(err) {
        done(err);
    });
});

и работа успешно завершена.

Как я могу получить функцию, похожую на функцию обратного вызова, в моем главном экземпляре Node, когда работа завершена? Как я могу вернуть некоторые результаты в основной экземпляр узла из рабочего экземпляра?

1 ответ

Я думаю, что я решил это, но я оставлю вопрос нерешенным, если кто-то сможет улучшить мое решение или предложить лучшее.

Когда вы используете Kue для обработки заданий в отдельном процессе, вы не можете просто выполнить обратный вызов после завершения задания. Это пример связи между двумя процессами. Мне бы хотелось использовать идентификатор, который Kue предоставляет каждому заданию автоматически (который, как я полагаю, совпадает с идентификатором, который он получает в Redis), но app.js должен знать идентификатор задания ДО того, как он будет отправлен работнику, чтобы может соответствовать идентификатору, когда он получает сообщение.

app.js

var child = require('child_process');
var async = require('async');

var worker = child.fork("./worker.js");

//When a message is received, search activeJobs for it, call finished callback, and delete the job
worker.on('message', function(m) {
    for(var i = 0; i < activeJobs.length; i++) {
        if(m.jobId == activeJobs[i].jobId) {
            activeJobs[i].finished(m.err, m.results);
            activeJobs.splice(i,1);
            break;
        }
    }
});

// local job system
var newJobId = 0;
var activeJobs = [];

function Job(input, callback) {
    this.jobId = newJobId;
    input.jobId = newJobId;
    newJobId++;
    activeJobs.push(this);

    worker.send(input);

    this.finished = function(err, results) {
        callback(err, results);
    }
}


var deleteIt = function(req, res) {
    async.series([
        function(callback) {
            // An *EXAMPLE* asynchronous task that is passed off to the worker to be processed
            // and requires a callback (because of async.series)
            new Job({
                jobType:'filesystem',
                title:'delete project directory',
                operation: 'delete',
                path: '/deleteMe'
            }, function(err) {
                callback(err);
            });
        },
        //Delete it from the database
        function(callback) {
            someObject.remove(function(err) {
                callback(err);
            });
        },
    ],
    function(err) {
        if(err) console.log(err);
    });
};

worker.js

var kue = require('kue');
var fs = require('fs-extra');

var jobs = kue.createQueue();

//Jobs that are sent arrive here
process.on('message', function(message) {
    if(message.jobType) {
        var job = jobs.create(message.jobType, message).save();
    } else {
        console.error("Worker:".cyan + " [ERROR] No jobType specified, message ignored".red);
    }
});

jobs.process('filesystem', function(job, done) {

    if(job.data.operation == 'delete') {
        fs.delete(job.data.path, function(err) {
            notifyFinished(job.data.jobId, err);
            done(err);
        });
    }
});

function notifyFinished(id, error, results) {
    process.send({jobId: id, status: 'finished', error: error, results: results});
}

https://gist.github.com/winduptoy/4991718

Другие вопросы по тегам