Как сделать асинхронную www-механизацию, используя anyevent

Я провел довольно много исследований по этой теме, и, хотя есть некоторые вопросы, связанные с этим, мне действительно трудно понять, как правильно выполнять асинхронное программирование с использованием AnyEvent и www-mechanize. Я пытаюсь придерживаться Mechanize, потому что у него чистый интерфейс и встроенные функции, которые я собираюсь сделать: (например, получить все изображения сайта и т. Д.). Если нет надежного / хорошего способа сделать то, что я хочу, тогда я начну смотреть на AnyEvent::HTTP, но думаю, что сначала спрошу, прежде чем двигаться в этом направлении.

Я новичок в программировании AnyEvent, но уже выполнил довольно много асинхронных вызовов perl и javascript / jquery с обратными вызовами. Это имеет большой смысл для меня, но это не для меня с AnyEvent + Mech.

Вот код, над которым я работаю, который извлекает URL-адреса из восходящей очереди. дать URL, я хочу получить один, который говорит, что вытягивает все изображения на странице, а затем асинхронно. захватывает все изображения.

Так что псевдокод будет выглядеть примерно так:

  • захватить URL из очереди
  • получить страницу
  • получить все ссылки img url
  • делать много асинхронных вызовов на URL-адресах img (например, хранить imgs в бэкэнде)

Я прочитал, я не могу (после исследования ошибок) заблокировать в обратном вызове AnyEvent. Как мне структурировать мою программу для выполнения асинхронных вызовов без блокировки?

События AE могут быть обработаны только тогда, когда блокируются функции AE, поэтому я использую LWP:: Protocol::AnyEvent:: http. Он заменяет обычный HTTP-бэкэнд для LWP (Net: HTTP) на AnyEvent::HTTP, который поддерживает AE.

Работник создается как:

my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555', run_on_create => 1);

Асинхронная часть - это sub _recv_msg, которая вызывает _proc_msg.

У меня уже есть цикл AnyEvent, наблюдающий за сокетом ZeroMQ в соответствии с документами по привязке perl ZeroMQ...

Любая помощь высоко ценится!

Код:

package Worker;

use 5.12.0;

use Moose;
use AnyEvent;
use LWP::Protocol::AnyEvent::http;

use ZMQ::LibZMQ3;
use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PULL ZMQ_POLLIN ZMQ_FD/;

use JSON;
use WWW::Mechanize;
use Carp;
use Coro;


has 'max_children' => (
    is => 'rw',
    isa => 'Int',
    required => 1,
    default => sub { 0 }
);

has 'upstream_job_url' => (
    is => 'rw',
    isa => 'URI',
    required => 1,
);

has ['uri','sink_url'] => (
    is => 'rw',
    isa => 'URI',
    required => 0,
);

has 'run_on_create' => (
    is => 'rw',
    isa => 'Bool',
    required => 1,
    default => sub { 1 }
);

has '_receiver' => (
    is => 'rw',
    isa => 'ZMQ::LibZMQ3::Socket',
    required => 0
);

sub BUILD {
    my $self = shift;
    $self->start if $self->run_on_create;
}

sub start
{
    my $self = shift;
    $self->_init_zmq();

    my $fh = zmq_getsockopt( $self->_receiver, ZMQ_FD );
    my $w; $w = AnyEvent->io( fh => $fh, poll => "r", cb => sub { $self->_recv_msg } );
    AnyEvent->condvar->recv;
}

sub _init_zmq
{   
    my $self = shift;
    my $c = zmq_init() or die "zmq_init: $!\n";
    my $recv = zmq_socket($c, ZMQ_PULL) or die "zmq_socket: $!\n";
    if( zmq_connect($recv, $self->upstream_job_url) != 0 ) {
        croak "zmq_connect: $!\n";
    }
    $self->_receiver($recv);
}

sub _recv_msg
{
    my $self = shift;
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
        my $msg = JSON::from_json($message, {utf8 => 1});
        $self->uri(URI->new($msg->{url}));
        $self->_proc_msg;
    }
}

sub _proc_msg
{
    my $self = shift;
    my $c = async { 
        my $ua = WWW::Mechanize->new;
        $ua->protocols_allowed(['http']); 
        print "$$ processing " . $self->uri->as_string . "... ";
        $ua->get($self->uri->as_string);
        if ($ua->success()) {
            say $ua->status . " OK";
        } else { 
            say $ua->status . " NOT OK";
        }
    }; 
    $c->join;
}

1;

Как вы можете видеть, я пытался Coro в _proc_msg, я пытался просто делать вызовы мех, но получаю ошибку

AnyEvent::CondVar: recursive blocking wait attempted at lib/Worker.pm line 91.

Потому что $ mech все еще блокирует в обратном вызове. Я не уверен, как правильно делать мех-вызовы в моем обратном вызове.


По запросу ikegami я добавил программу драйвера, которая отправляет URL-адреса. Для тестирования я просто читаю канал RSS и отправляю ссылки работникам, чтобы попытаться обработать их. Мне было любопытно узнать о базовой структуре любого события с обратными вызовами, но я более чем счастлив просто получить помощь по программе в целом. Вот код драйвера:

#!/usr/local/bin/perl

use strict;
use warnings;
use v5.12.0;

use lib './lib';

use Config::General;
use Getopt::Long;
use Carp;
use AnyEvent;
use AnyEvent::Feed;
use Parallel::ForkManager;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_PUSH ZMQ_PULL);
use Worker;

# Debug
use Data::Dumper;
$Data::Dumper::Deparse = 1;

my $config_file = "feeds.cfg";

GetOptions(
    "--config|c" => \$config_file,
    "--help|h" => sub { usage(); exit(0); }
);

sub usage() 
{
    say "TODO";
}

$SIG{INT} = sub { croak; }; $SIG{TERM} = sub { croak; };
$SIG{CHLD} = 'IGNORE';

my $conf = Config::General->new($config_file) or croak "Couldn't open config file '$config_file' $!\n";

my %config = $conf->getall();
my @readers = ();
my @feeds = load_feeds(\%config);

my $mgr = Parallel::ForkManager->new( $config{'max_download_children'} ) or croak "Can't create fork manager: $!\n";
my $context = zmq_init() or croak "zmq_init: $!\n";
my $sender = zmq_socket($context, ZMQ_PUSH) or die "zmq_socket: $!\n";

foreach my $feed_cfg (@feeds) {
    my $reader = AnyEvent::Feed->new(url => delete $feed_cfg->{url}, %$feed_cfg);
    push(@readers, $reader); # save, don't go out of scope
}

# Fork Downloader children. These processes will look for incoming data
# in the img_queue and download the images, storing them in nosql
for ( 1 .. $config{'max_download_children'} ) {
    my $pid = $mgr->start; 
    if (!$pid) {
        # Child
        my $worker = Worker->new({
            upstream_job_url => URI->new('tcp://127.0.0.1:5555')
        });
        $mgr->finish;
        say "$$ exiting.";
        exit(0);
    } else {
        # Parent
        say "[forked child $pid] my pid is $$";
    }
}

if (zmq_bind($sender, 'tcp://127.0.0.1:5555') < 0) {
    croak "zmq_bind: $!\n";
}

# Event loop 
AnyEvent->condvar->recv;

sub load_feeds
{
    my $conf = shift;
    my @feeds = ();
    foreach my $feed ( keys %{$conf->{'feeds'}} ) {
        my $feed_ref = $conf->{'feeds'};
        $feed_ref->{$feed}->{'name'} = $feed;
        $feed_ref->{$feed}->{'on_fetch'} = \&fetch_feed_cb;
        push(@feeds, $feed_ref->{$feed});   
    }
    return @feeds;
}

sub fetch_feed_cb
{
    my ($feed_reader, $new_entries, $feed, $error) = @_;
    if (defined $error) {
        say "Error fetching feed: $error";
        return;
    }
    say "$$ checking for new feeds";
    for (@$new_entries) {
        my ($hash, $entry) = @$_;
        say "$$ sending " . $entry->link;
        zmq_send($sender, JSON::to_json( { url => $entry->link }, { pretty => 1, utf8 => 1 } ));
    }
}

Вот примерный прогон:

[forked child 40790] my pid is 40789
[forked child 40791] my pid is 40789
[forked child 40792] my pid is 40789
40789 checking for new feeds
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/mWprjBD3UhM/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/NngMs9pCQew/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/wiUsvafLGFU/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/QMp6gnZpFcA/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/kqUb_rpU5dE/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/tHItKqKhGXg/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/7LleQbVnPmE/
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
40791 processing http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/... 
40790 processing http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/... 
40792 processing http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/... ^C at /usr/local/perls/perl5162/lib/perl5/site_perl/darwin-thread-multi-2level/AnyEvent/Loop.pm line 231.

Если я не сделаю явно "использовать Coro;" в Worker.pm ошибки FATAL coro не отображаются. Я не знаю, как асинхронный работал раньше без дальнейших ошибок во время выполнения.

Пример файла конфигурации (feeds.cfg):

max_download_children = 3
<feeds>
    <feed1>
        url="http://feeds.feedburner.com/PerlNews?format=xml"   
        interval=60
    </feed1>
</feeds>

Поэтому я провел немного больше времени с этим сегодня. Так что ошибка моих способов сделать соединение $c->. Я не должен этого делать, поскольку не могу заблокировать обратный вызов. Coro запланирует асинхронный блок, и это будет сделано, когда это будет сделано. Единственное, что мне нужно сделать, это как-то узнать, когда все асинхронные операции выполнены, и я думаю, что смогу это выяснить. Теперь сложная часть пытается выяснить этот маленький кусочек тайны:

sub _recv_msg
{
    my $self = shift;
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
        my $msg = JSON::from_json($message, {utf8 => 1});
        $self->uri(URI->new($msg->{url}));
        $self->_proc_msg;
    }
}

Этот цикл while заставляет мои асинхронные { } потоки в _proc_msg НЕ РАБОТАТЬ. Удалите цикл while и просто обработайте первое сообщение и запустите coros. Оставьте цикл while на месте, и они никогда не запустятся. Странно для меня, еще не понял, почему.


Дальнейшие обновления:

zmq_msg_recv был заблокирован. Кроме того, zmq_send в родительском может заблокировать. Приходится использовать ZMQ_NOBLOCK. Я разделил рабочий и главный на отдельные программы полностью.

1 ответ

Вы можете использовать https://metacpan.org/pod/AnyEvent::HTTP::LWP::UserAgent для асинхронных вызовов.

  use AnyEvent::HTTP::LWP::UserAgent;
  use AnyEvent;

  my $ua = AnyEvent::HTTP::LWP::UserAgent->new;
  my @urls = (...);
  my $cv = AE::cv;
  $cv->begin;
  foreach my $url (@urls) {
      $cv->begin;
      $ua->get_async($url)->cb(sub {
          my $r = shift->recv;
          print "url $url, content " . $r->content . "\n";
          $cv->end;
      });
  }
  $cv->end;
  $cv->recv;
Другие вопросы по тегам