gpt4 book ai didi

Raku Cro 服务订阅数据 "in the background"一般指导

转载 作者:行者123 更新时间:2023-12-03 14:36:13 27 4
gpt4 key购买 nike

我正在尝试组合一个 Cro 服务,该服务具有 react/whenever 阻止“在后台”消费数据因此与许多使用 Cro 的 websocket 使用示例不同,这与可以通过浏览器访问的路由无关。
我的用例是使用通过 MQTT 主题收到的消息并对其进行一些处理。在开发的后期阶段,我可能会从这些数据中创建一个供应,但现在,当接收到数据时,它将存储在一个变量中,并根据某些条件,通过 http post 发送到另一个服务。
我的想法是包含一个 provider()Cro::HTTP::Server像这样设置:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
http => <1.1>,
host => ...,
port => ...,
application => [routes(), provider()], # Made this into an array of subs?
after => [
Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
]
);
在 DataProvider.pm6 中:
use MQTT::Client;

sub provider() is export {
my $mqtt = MQTT::Client.new: server => 'localhost';
react {
whenever $mqtt.subscribe('some/mqtt/topic') {
say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
}
}
}
这会引发一堆错误:
A react block:
in sub provider at DataProvider.pm6 (DataProvider) line 5
in block <unit> at service.p6 line 26

Died because of the exception:
Invocant of method 'write' must be an object instance of type
'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. Did
you forget a '.new'?
in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
in sub provider at DataProvider.pm6 (DataProvider) line 6
in block <unit> at service.p6 line 26
老实说,我完全猜想这就是我在 Cro 服务的后台订阅数据的方法,但我找不到任何关于什么可能被认为是推荐方法的信息。
最初,我在主 service.pm6 中有我的 react/whenever block 。文件,但这似乎不正确。并且需要包裹在 start{} 中阻止,因为正如我刚刚了解到的那样,react 正在阻止:) 并且 cro 无法真正启动。
但是遵循如何实现路由的模式似乎是合乎逻辑的,但我错过了一些东西。该错误涉及设置新方法,但我不相信这是根本原因。 Routes.pm6没有构造函数。
谁能指出我正确的方向?

最佳答案

感谢所有提供信息的人,这是一次非常有值(value)的学习练习。
传递附加子例程的方法,沿着 router()application Cro::HTTP::Server.new 的参数给了更大的麻烦。 (不允许使用数组,并且破坏了路由)
相反,我将后台工作移到了它自己的一个类中,并给它一个 startstop方法更类似于 Cro::HTTP::Server .
我的新方法:
服务.pm6

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here
use Database;

my $dsn = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh = Database.new :$dsn;

my $mqtt-host = 'localhost';
my $subscriber = KlineDataSubscriber.new :$mqtt-host;

$subscriber.start; # Inspired by $http.start below

my Cro::Service $http = Cro::HTTP::Server.new(
http => <1.1>,
host => ...,
port => ...,
application => routes($dbh), # Basically back the way it was originally
after => [
Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
]
);

$http.start;
say "Listening at...";
react {
whenever signal(SIGINT) {
say "Shutting down...";
$subscriber.stop;
$http.stop;
done;
}
}
在 KlineDataSubscriber.pm6
use MQTT::Client;

class KlineDataSubscriber {
has Str $.mqtt-host is required;
has MQTT::Client $.mqtt = Nil;

submethod TWEAK() {
$!mqtt = MQTT::Client.new: server => $!mqtt-host;
await $!mqtt.connect;
}

method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
start {
react {
whenever $!mqtt.subscribe($topic) {
say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
}
}
}
}

method stop() {
# TODO Figure how to unsubscribe and cleanup nicely
}
}
这对我来说感觉更像“Cro 惯用语”,但我很乐意得到纠正。
更重要的是,它按预期工作,我觉得有点面向 future 。我应该能够创建一个供应以使路由器可以使用实时数据,并将数据推送到任何连接的 Web 客户端。
我还打算有一个 http GET 端点 /status进行各种检查以确保一切健康

关于Raku Cro 服务订阅数据 "in the background"一般指导,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66243271/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com