gpt4 book ai didi

java - 将 Akka 源代码转换为 RxJava2 Flowable?

转载 作者:行者123 更新时间:2023-11-30 02:25:04 24 4
gpt4 key购买 nike

我目前正在使用以下代码将 Akka 源(例如使用 Akka 的 FileIO 读取文件接收到的)转换为 RxJava2 Flowable:

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
final Publisher<ByteString> uncompressedData =
data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
return Flowable.fromPublisher(uncompressedData)
.map(bytes -> Buffer.buffer(bytes.toArray()));
}

我对这个(工作)解决方案的问题是,至少就我目前的理解而言,.runWith()方法调用已经运行了代码,即从给定的 Source,对其进行缓冲,然后将其放入 Publisher 中。此时有什么办法可以避免必须运行它吗?我想在此时定义转换而不使用物化器,并且仅在稍后订阅 Flowable 后才运行所有内容。

谢谢!

最佳答案

使用 defer (旁注:我必须多次这样做,因为 Akka Sources 是一次性的):

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {

return Flowable.defer(() -> data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer)
).map(bytes -> Buffer.buffer(bytes.toArray()));
}

关于java - 将 Akka 源代码转换为 RxJava2 Flowable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45855239/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com