gpt4 book ai didi

perl - 使用 AnyEvent::Handle 和 tcp_connect 重新连接

转载 作者:行者123 更新时间:2023-12-05 00:11:44 25 4
gpt4 key购买 nike

我有一个使用 AnyEvent::Handle 编写的简单 TCP 服务器和客户端利用 tcp_connecttcp_server。客户端连接到服务器并每 5 秒发送一次字符串 Test Message

如果服务器可访问,这将没有问题,但是,如果服务器在客户端启动时不可用或变得不可用,则客户端脚本永远不会尝试重新连接。

如果连接句柄不可用(已损坏?),我希望它尝试重新连接。如果不可用,请执行操作(可能会打印状态消息),但理想的结果是每 5 秒尝试重新连接一次。

我不知道该怎么做。我已将我的客户端和服务器代码缩减为以下内容。

客户端

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my @bulk;

# Start Timer
my $timer = AnyEvent->timer(
after => 5,
interval => 5,
cb => sub {
push( @bulk, "Test message" );
flush( \@bulk );
undef @bulk;
} );

my $host = '127.0.0.1';
my $port = 9999;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

$conn_hdl = AnyEvent::Handle->new(
connect => [$host, $port],
keepalive => 1,
on_connect_error => sub {
print "Could not connect: $_[1]\n";
$conn_hdl->destroy;

#$conn_cv->send;
},
on_error => sub {
my ( $out_hdl, $fatal, $msg ) = @_;
AE::log error => $msg;
$conn_hdl->destroy;

#$conn_cv->send;
},
on_read => sub {
my ( $self ) = @_;
$self->unshift_read(
line => sub {
my ( $hdl, $data ) = @_;
print $data. "\n";
} );
} );

$conn_cv->recv;

# Flush array of events
sub flush {
my ( $bulk ) = @_;
return 0 if scalar @{$bulk} == 0;

my $output = join( ",", @{$bulk} );
$output = compress( $output );
my $l = pack( "N", length( $output ) );
$output = $l . $output;
$conn_hdl->push_write( $output );
}

服务器

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my %holding;

my $host = '127.0.0.1';
my $port = 9999;

my %connections;

# Start Timer
my $timer = AnyEvent->timer(
after => 5,
interval => 5,
cb => sub {
print "Number of connected hosts: ";
print scalar keys %connections;
print "\n";
foreach my $k ( keys %connections ) {
delete $connections{$k} if $connections{$k}->destroyed;
}
} );

my $server_cv = AnyEvent->condvar;
my $server = tcp_server(
$host, $port,
sub {
my ( $fh, $h, $p ) = @_;
my $handle;

$handle = AnyEvent::Handle->new(
fh => $fh,
poll => 'r',
keepalive => 1,
on_read => sub {
my ( $self ) = @_;

# Get Length Header
$self->unshift_read(
chunk => 4,
sub {
my $len = unpack( "N", $_[1] );

# Get Data
$self->unshift_read(
chunk => $len,
sub {
my $data = $_[1];
$data = uncompress( $data );
print $data. "\n";
} );
} );

},
on_eof => sub {
my ( $hdl ) = @_;
$hdl->destroy();
},
on_error => sub {
my ( $hdl ) = @_;
$hdl->destroy();
},
);

$connections{ $h . ':' . $p } = $handle; # keep it alive.
} );

$server_cv->recv;

最佳答案

您可以使用以下内容:

package MyConnector;

use strict;
use warnings;

use AE qw( );
use AnyEvent::Handle qw( );
use Scalar::Util qw( );

sub new {
my $class = shift;
my %opts = @_;

my $self = bless({}, $class);

{
Scalar::Util::weaken(my $self = $self);

my $on_connect = delete($opts{on_connect});
my $on_connect_error = delete($opts{on_connect_error});

my $tries = delete($opts{tries}) || 5;
my $cooldown = delete($opts{cooldown}) || 15;

$self->{_connect} = sub {
$self->{_timer} = undef;

$self->{_handle} = AnyEvent::Handle->new(
%opts,

on_connect => sub {
my ($handle, $host, $port, $retry) = @_;

$self->{handle} = $handle;
delete @{$self}{qw( _connect _handle _timer )};

$on_connect->($handle, $host, $port, $retry)
if $on_connect;
},

on_connect_error => sub {
my ($handle, $message) = @_;

if (!$tries--) {
$on_connect_error->($handle, $message)
if $on_connect_error;

delete @{$self}{qw( _connect _handle _timer )};

return;
}

# This will happen when this callback returns,
# but that might not be for a while, so let's
# do it now in case it saves resources.
$handle->destroy();

$self->{_timer} = AE::timer($cooldown, 0, $self->{_connect});
},
);
};

$self->{_connect}->();
}

return $self;
}

sub handle {
my ($self) = @_;
return $self->{handle};
}

1;

我很确定它没有内存泄漏(与您的代码不同)。您可以按如下方式使用它:

use strict;
use warnings;

use AE qw( );
use MyConnector qw( );

my $host = $ARGV[0] || 'www.stackoverflow.com';
my $port = $ARGV[1] || 80;

my $conn_cv = AE::cv();

my $connector = MyConnector->new(
connect => [ $host, $port ],
keepalive => 1,

on_connect => sub {
print("Connected successfully\n");
$conn_cv->send();
},

on_connect_error => sub {
warn("Could not connect: $_[1]\n");
$conn_cv->send();
},

# ...
);

$conn_cv->recv();

关于perl - 使用 AnyEvent::Handle 和 tcp_connect 重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30967783/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com