gpt4 book ai didi

java - 创建热可观察对象

转载 作者:行者123 更新时间:2023-11-30 08:09:18 24 4
gpt4 key购买 nike

我正在迈出 RX 的第一步。我读过一些相关内容,但我认为亲自动手是更好的方法。因此,我开始将现有代码转换为 Rx 类型的代码。

目标:我试图模拟一个以特定频率(例如 60/s、摄像机或其他)发送数据的源。我有录制的片段来模拟源,但源不可用。即使没有人在听,我也需要源开始发送,因为这就是真正的源会做的事情。

在 Rx 之前,我创建了一个 Runnable,它仅迭代 15.000 个数据项,将该项发送到我的 RabbitMQ 服务器并 hibernate 1/60 秒,然后发送下一个。

现在我想把这个逻辑变成一个热可观察的,只是为了玩玩。到目前为止我有这个:

Observable.from(mDataItems)
.takeWhile(item -> mRunning)
.map(mGson::toJson)
.doOnNext(json -> {
try {
mChannel.basicPublish(EXCHANGE_NAME, "", null, json.getBytes());
} catch (IOException e) {
Logger.error(e, String.format("Could not publish to %s exchange", EXCHANGE_NAME));
}

try {
Thread.sleep(1 / SENDING_FREQUENCY_IN_HZ);
} catch (InterruptedException e) {
Logger.error(e, String.format("Could not sleep for %d ms", (int) (1000 / SENDING_FREQUENCY_IN_HZ)));
}
})
.doOnCompleted(() -> {
if (mRunning)
Logger.info("All data sent");
else
Logger.info("Interrupted while sending");

disconnect();
mRunning = false;
})
.subscribeOn(Schedulers.io())
.publish()
.connect();

尽管到目前为止它有效,但我不知道这是否是创建仅发出项目的热 Observable(或一般意义上的 Observable)的“好”方法。 (我也不知道是否应该使用Subject而不是Observable,但那是另一个问题)。

最佳答案

是的,还有一个替代方案:

int delay = 1000 / frequency;
Observable o = Observable.from(dataItems)
.zipWith(
Observable.timer(delay, delay, TimeUnit.MILLISECONDS)
.onBackpressureDrop(),
(s, t) -> s)
.map(mGson::toJson)
// other ops as necessary
.subscribeOn(Schedulers.io())
.publish();

o.connect();

o.subscribe(...);

关于java - 创建热可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30645898/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com