gpt4 book ai didi

multithreading - 如何在 Perl 中处理任何事件、RabbitMQ(心跳)和长时间运行的作业?

转载 作者:行者123 更新时间:2023-12-04 08:27:41 26 4
gpt4 key购买 nike

我正在实现一个用于分布式 cronjob 执行的系统(所谓的 cron 计算集群)。当行动时间到来时,Cronjobs 应该排队进入消息队列(RabbitMQ)。在另一侧(集群的节点/ worker )是一个 Perl 守护进程,它利用 AnyEvent::RabbitMQ 从消息队列中接收一个 cronjob/task/message,处理任务并准确地请求另一个来自消息队列的一个 cronjob/task/message 等等。

我使用通过 AnyEvent::RabbitMQ 实现的 RabbitMQ 心跳功能来帮助 RabbitMQ 识别断开的连接。

不用管心跳间隔的实际值!我也有很长的工作需要几天时间。因此,将时间间隔设置为最长的 cronjob 将花费的时间不是一种选择。

请参阅以下代码段以在 Perl 守护进程工作程序中执行实际的 cronjob。它是在“AnyEvent->timer”中实现的,不会对消息使用 DoSing RabbitMQ。由于 RabbitMQ 的 consume 被禁止(由管理),因此使用了此方法。

sub _timer_tick {

$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if ( not $amqp_method->{empty} ) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
);

return;
}

progress_job() 是解析消息和执行作业的地方。 pause_timer()resume_timer() 控制触发 _timer_tick()AnyEvent->timer

use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
my ( $stdout, $stderr, $exit ) = capture {
system $job->{execute};
};
return;
}

第一个长时间运行的作业进入,系统“崩溃”并出现各种错误消息。有时它会抛出“Unknown channel id: 1”,有时会抛出“Channel has already been closed”。所以我做了“愚蠢的调试”(试图弄乱配置)并发现当 heartbeat 间隔短于 progress_job() 所花费的时间时,这些错误将被抛出。经过一番思考,这是有道理的。 progress_job() 是一个阻塞子程序,AnyEvent 无法继续将心跳包发送到 RabbitMQ。

我对解决阻塞热跳问题的第一个快速想法是在子进程中 fork 并执行 progress_job()AnyEvents documentation on FORK指出当无法访问子项中的事件系统(例如通过 AnyEvent)时,使用 fork 是省事的方法。接下来的想法是:好吧,没有事件系统的访问权限,所以我可以进行 fork。但是:计时器应该在 progress_job() 返回后恢复 (resume_timer())。理论上,resume_timer() 将在 fork() 之后调用,而不是在 progress_job() 返回之后调用。所以我停止了实现。

我的问题:如何解决最后一点? progress_job()(或者换句话说, fork 的 child )返回后如何resume_timer()?由于 fork 和事件系统不是线程安全的,我无法将 resume_timer() 放入子项中。

最佳答案

AE 无法处理事件,除非使用 AE 感知调用阻止程序。 system 不支持 AE。使用 AnyEvent::Util 中的 run_cmd相反。

关于multithreading - 如何在 Perl 中处理任何事件、RabbitMQ(心跳)和长时间运行的作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34852994/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com