Как справиться с AnyEvent, RabbitMQ (heartbeat) и долгосрочными заданиями в Perl?
Я внедряю систему для распределенного выполнения Cronjob (так называемый Cron вычислительный кластер). Cronjobs должны быть поставлены в очередь в очередь сообщений (RabbitMQ), когда наступит время действия. С другой стороны (узлы / работники кластера) находится демон Perl, использующий AnyEvent::RabbitMQ
чтобы получить ровно одно cronjob / task / message из очереди сообщений, обработайте задачу и запросите еще одно точно cronjob / task / message из очереди сообщений и так далее.
Я использую функцию сердцебиения RabbitMQ, которая реализована с AnyEvent::RabbitMQ
помочь RabbitMQ в выявлении разорванных соединений.
Не берите в голову фактическое значение интервала сердцебиения! У меня также есть очень длительные рабочие места, которые занимают дни. Таким образом, установка интервала на несколько самый длительный период cronjob не подходит.
Посмотрите следующий фрагмент для выполнения фактического cronjob в работнике демона Perl. Он реализован в "AnyEvent->timer", чтобы не делать DoSing RabbitMQ для сообщения. Этот метод был использован из-за RabbitMQ consume
было запрещено (руководством).
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if ( not $amqp_method->{empty} ) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
);
return;
}
progress_job()
где сообщение анализируется и задание будет выполнено. pause_timer()
а также resume_timer()
контролирует AnyEvent->timer
это вызывает _timer_tick()
,
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
my ( $stdout, $stderr, $exit ) = capture {
system $job->{execute};
};
return;
}
Первые длительные задания были запущены, и система "зависала" с различными сообщениями об ошибках. Иногда он выдает "Неизвестный идентификатор канала: 1", в других случаях он выдает "Канал уже закрыт". Так что я сделал 'тупой отладки' (пытаясь возиться с конфигурацией) и обнаружил, что когда heartbeat
интервал короче, чем время, затраченное на progress_job()
эти ошибки будут выброшены. После некоторых размышлений это имеет смысл. progress_job()
является блокирующей подпрограммой, и AnyEvent не может продолжить отправку пакетов контрольных сигналов в RabbitMQ.
Моей первой быстрой мыслью по решению проблемы блокирующего теплового удара было форкнуть и сделать progress_job()
в дочернем процессе. AnyEvents документация на FORK указывает, что это сохранить для использования fork
когда нет доступа к системе событий (например, через AnyEvent) внутри ребенка. Следующая мысль: ОК, нет доступа к системе событий, поэтому я могу сделать форк. НО: таймер должен возобновиться (resume_timer()
) ПОСЛЕ progress_job()
вернулся. Теоретически resume_timer()
будет вызван сразу после fork()
а не после progress_job()
возвращается. Поэтому я остановил свою реализацию.
Мой вопрос: как решить последний бит? Как resume_timer()
после progress_job()
(или другими словами раздвоенный ребенок) возвращается? Я не могу поставить resume_timer()
внутри ребенка из-за разветвления и событийной системы не являются потокобезопасными.
1 ответ
AE не может обрабатывать события, если программа не заблокирована с помощью вызова с поддержкой AE. system
не AE-осведомлен. использование run_cmd
вместо AnyEvent:: Util.