Переподключитесь, используя AnyEvent::Handle и tcp_connect
У меня есть простой TCP-сервер и клиент, написанный с использованием AnyEvent::Handle
левереджа tcp_connect
а также tcp_server
, Клиент подключается к серверу и отправляет строку Test Message
каждые 5 секунд.
Это работает без проблем, если сервер доступен, однако, если сервер недоступен при запуске клиента или становится недоступным, клиентский сценарий никогда не пытается восстановить соединение.
Я хотел бы попытаться восстановить соединение, если дескриптор соединения недоступен (уничтожен?). Если недоступно, сделайте что-нибудь (возможно, распечатайте сообщение о состоянии), но попытайтесь подключиться каждые 5 секунд, это будет идеальным результатом.
Я не уверен, как это сделать. Я сократил код моего клиента и сервера до следующего.
клиент
#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;
my @bulk;
# Start Timer
my $timer = AnyEvent->timer(
after => 5,
interval => 5,
cb => sub {
push( @bulk, "Test message" );
flush( \@bulk );
undef @bulk;
} );
my $host = '127.0.0.1';
my $port = 9999;
my $conn_cv = AnyEvent->condvar;
my $conn_hdl;
$conn_hdl = AnyEvent::Handle->new(
connect => [$host, $port],
keepalive => 1,
on_connect_error => sub {
print "Could not connect: $_[1]\n";
$conn_hdl->destroy;
#$conn_cv->send;
},
on_error => sub {
my ( $out_hdl, $fatal, $msg ) = @_;
AE::log error => $msg;
$conn_hdl->destroy;
#$conn_cv->send;
},
on_read => sub {
my ( $self ) = @_;
$self->unshift_read(
line => sub {
my ( $hdl, $data ) = @_;
print $data. "\n";
} );
} );
$conn_cv->recv;
# Flush array of events
sub flush {
my ( $bulk ) = @_;
return 0 if scalar @{$bulk} == 0;
my $output = join( ",", @{$bulk} );
$output = compress( $output );
my $l = pack( "N", length( $output ) );
$output = $l . $output;
$conn_hdl->push_write( $output );
}
сервер
#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;
my %holding;
my $host = '127.0.0.1';
my $port = 9999;
my %connections;
# Start Timer
my $timer = AnyEvent->timer(
after => 5,
interval => 5,
cb => sub {
print "Number of connected hosts: ";
print scalar keys %connections;
print "\n";
foreach my $k ( keys %connections ) {
delete $connections{$k} if $connections{$k}->destroyed;
}
} );
my $server_cv = AnyEvent->condvar;
my $server = tcp_server(
$host, $port,
sub {
my ( $fh, $h, $p ) = @_;
my $handle;
$handle = AnyEvent::Handle->new(
fh => $fh,
poll => 'r',
keepalive => 1,
on_read => sub {
my ( $self ) = @_;
# Get Length Header
$self->unshift_read(
chunk => 4,
sub {
my $len = unpack( "N", $_[1] );
# Get Data
$self->unshift_read(
chunk => $len,
sub {
my $data = $_[1];
$data = uncompress( $data );
print $data. "\n";
} );
} );
},
on_eof => sub {
my ( $hdl ) = @_;
$hdl->destroy();
},
on_error => sub {
my ( $hdl ) = @_;
$hdl->destroy();
},
);
$connections{ $h . ':' . $p } = $handle; # keep it alive.
} );
$server_cv->recv;
2 ответа
Вы можете использовать следующее:
package MyConnector;
use strict;
use warnings;
use AE qw( );
use AnyEvent::Handle qw( );
use Scalar::Util qw( );
sub new {
my $class = shift;
my %opts = @_;
my $self = bless({}, $class);
{
Scalar::Util::weaken(my $self = $self);
my $on_connect = delete($opts{on_connect});
my $on_connect_error = delete($opts{on_connect_error});
my $tries = delete($opts{tries}) || 5;
my $cooldown = delete($opts{cooldown}) || 15;
$self->{_connect} = sub {
$self->{_timer} = undef;
$self->{_handle} = AnyEvent::Handle->new(
%opts,
on_connect => sub {
my ($handle, $host, $port, $retry) = @_;
$self->{handle} = $handle;
delete @{$self}{qw( _connect _handle _timer )};
$on_connect->($handle, $host, $port, $retry)
if $on_connect;
},
on_connect_error => sub {
my ($handle, $message) = @_;
if (!$tries--) {
$on_connect_error->($handle, $message)
if $on_connect_error;
delete @{$self}{qw( _connect _handle _timer )};
return;
}
# This will happen when this callback returns,
# but that might not be for a while, so let's
# do it now in case it saves resources.
$handle->destroy();
$self->{_timer} = AE::timer($cooldown, 0, $self->{_connect});
},
);
};
$self->{_connect}->();
}
return $self;
}
sub handle {
my ($self) = @_;
return $self->{handle};
}
1;
Я почти уверен, что в нем нет утечек памяти (в отличие от вашего кода). Вы бы использовали его следующим образом:
use strict;
use warnings;
use AE qw( );
use MyConnector qw( );
my $host = $ARGV[0] || 'www.stackru.com';
my $port = $ARGV[1] || 80;
my $conn_cv = AE::cv();
my $connector = MyConnector->new(
connect => [ $host, $port ],
keepalive => 1,
on_connect => sub {
print("Connected successfully\n");
$conn_cv->send();
},
on_connect_error => sub {
warn("Could not connect: $_[1]\n");
$conn_cv->send();
},
# ...
);
$conn_cv->recv();
Благодаря ikegami я решил, что, возможно, захочу отследить состояние соединения и использовать его в сочетании с другим наблюдателем таймера AnyEvent для повторного соединения, если соединение не установлено. Это приводит к попыткам подключения каждую секунду, если состояние подключения ($isConnected) равно нулю. Между тем, события ставятся в очередь, когда соединение восстанавливается.
Если есть более простой способ сделать это, я весь слух, но сейчас я думаю, что это решит проблему.
my @bulk;
my $host = '127.0.0.1';
my $port = 9999;
my $isConnected = 0;
my $conn_cv = AnyEvent->condvar;
my $conn_hdl;
# Flush Timer
my $timer = AnyEvent->timer(
after => 5,
interval => 5,
cb => sub {
push(@bulk,"Test message");
if ($isConnected == 1) {
flush(\@bulk);
undef @bulk;
}
}
);
# Reconnect Timer
my $reconn = AnyEvent->timer(
after => 1,
interval => 1,
cb => sub {
if ($isConnected == 0) {
$conn_hdl = AnyEvent::Handle->new(
connect => [$host, $port],
keepalive => 1,
on_connect => sub {
$isConnected = 1;
},
on_connect_error => sub {
warn "Could not connect: $_[1]\n";
$conn_hdl->destroy;
$isConnected = 0;
},
on_error => sub {
my ($out_hdl, $fatal, $msg) = @_;
AE::log error => $msg;
$conn_hdl->destroy;
$isConnected = 0;
},
on_eof => sub {
warn "EOF\n";
$conn_hdl->destroy;
$isConnected = 0;
},
on_read => sub {
my ($self) = @_;
$self->unshift_read(line => sub {
my ($hdl,$data) = @_;
print $data."\n";
});
}
);
}
}
);
$conn_cv->recv;