Как справиться с AnyEvent, RabbitMQ (heartbeat) и долгосрочными заданиями в Perl?

Я внедряю систему для распределенного выполнения Cronjob (так называемый Cron вычислительный кластер). Cronjobs должны быть поставлены в очередь в очередь сообщений (RabbitMQ), когда наступит время действия. С другой стороны (узлы / работники кластера) находится демон Perl, использующий AnyEvent::RabbitMQ чтобы получить ровно одно cronjob / task / message из очереди сообщений, обработайте задачу и запросите еще одно точно cronjob / task / message из очереди сообщений и так далее.

Я использую функцию сердцебиения RabbitMQ, которая реализована с AnyEvent::RabbitMQ помочь RabbitMQ в выявлении разорванных соединений.

Не берите в голову фактическое значение интервала сердцебиения! У меня также есть очень длительные рабочие места, которые занимают дни. Таким образом, установка интервала на несколько самый длительный период cronjob не подходит.

Посмотрите следующий фрагмент для выполнения фактического cronjob в работнике демона Perl. Он реализован в "AnyEvent->timer", чтобы не делать DoSing RabbitMQ для сообщения. Этот метод был использован из-за RabbitMQ consume было запрещено (руководством).

sub _timer_tick {

  $rabbitmq_channel->get(
    queue      => 'job_queue',
    on_success => sub {
      my ($amqp_method) = @_;
      if ( not $amqp_method->{empty} ) {
        pause_timer();
        progress_job($amqp_method);
        resume_timer();
      }
    },
    on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
  );

  return;
}

progress_job() где сообщение анализируется и задание будет выполнено. pause_timer() а также resume_timer() контролирует AnyEvent->timer это вызывает _timer_tick(),

use Capture::Tiny 'capture';
sub progress_job {
  my ($amqp_method) = @_;
  my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
  my ( $stdout, $stderr, $exit ) = capture {
    system $job->{execute};
  };
  return;
}

Первые длительные задания были запущены, и система "зависала" с различными сообщениями об ошибках. Иногда он выдает "Неизвестный идентификатор канала: 1", в других случаях он выдает "Канал уже закрыт". Так что я сделал 'тупой отладки' (пытаясь возиться с конфигурацией) и обнаружил, что когда heartbeat интервал короче, чем время, затраченное на progress_job() эти ошибки будут выброшены. После некоторых размышлений это имеет смысл. progress_job() является блокирующей подпрограммой, и AnyEvent не может продолжить отправку пакетов контрольных сигналов в RabbitMQ.

Моей первой быстрой мыслью по решению проблемы блокирующего теплового удара было форкнуть и сделать progress_job() в дочернем процессе. AnyEvents документация на FORK указывает, что это сохранить для использования fork когда нет доступа к системе событий (например, через AnyEvent) внутри ребенка. Следующая мысль: ОК, нет доступа к системе событий, поэтому я могу сделать форк. НО: таймер должен возобновиться (resume_timer()) ПОСЛЕ progress_job() вернулся. Теоретически resume_timer() будет вызван сразу после fork() а не после progress_job() возвращается. Поэтому я остановил свою реализацию.

Мой вопрос: как решить последний бит? Как resume_timer() после progress_job() (или другими словами раздвоенный ребенок) возвращается? Я не могу поставить resume_timer() внутри ребенка из-за разветвления и событийной системы не являются потокобезопасными.

1 ответ

Решение

AE не может обрабатывать события, если программа не заблокирована с помощью вызова с поддержкой AE. system не AE-осведомлен. использование run_cmd вместо AnyEvent:: Util.

Другие вопросы по тегам