gpt4 book ai didi

reactive-programming - 使用响应式(Reactive) Couchbase java 驱动程序进行批处理

转载 作者:行者123 更新时间:2023-12-04 10:30:50 26 4
gpt4 key购买 nike

假设我有一个存储桶,我需要从中获取日期早于现在的文档。
该文档如下所示:

{
id: "1",
date: "Some date",
otherObjectKEY: "key1"
}

对于每个文档,我需要使用它的 获取另一个文档。 otherObjectKEY ,将后者发送到一个kafka主题,然后删除原始文档。

使用 响应式(Reactive) Java 驱动程序 3.0 ,我能够做到这一点:
public void batch(){
streamOriginalObjects()
.flatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
.flatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
)
.subscribe();
}

流原始对象():
public Flux<OriginalObject> streamOriginalObjects(){
return client.query("select ... and date <= '"+ LocalDateTime.now().toString() +"'")
.flux()
.flatMap(result -> result.rowsAs(OriginalObject.class));
}

它按预期工作,但我想知道是否有比逐元素流式传输和处理更好的方法(尤其是在性能方面)。

最佳答案

执行 N1QL 查询,然后从中扇出键值操作,是一种有用且常见的模式。这应该使扇出并行发生:

    streamOriginalObjects()
// Split into numberOfThreads 'rails'
.parallel(numberOfThreads)

// Run on an unlimited thread pool
.runOn(Schedulers.elastic())

.concatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
.concatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
)

// Back out of parallel mode
.sequential()
.subscribe();

关于reactive-programming - 使用响应式(Reactive) Couchbase java 驱动程序进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60420328/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com