gpt4 book ai didi

java - Project Reactor(或 RxJava2)执行程序序列调用

转载 作者:太空宇宙 更新时间:2023-11-04 10:00:46 30 4
gpt4 key购买 nike

我有以下任务,我想使用 Project Reactor(或 RxJava)来解决它

有事件的来源。每个事件由 serviceId 和一些负载组成。收到事件后,我们需要使用有效负载对指定的 serviceId 执行操作。但我们要保证对同一个serviceId的两次请求之间的时间间隔必须大于或等于1秒。但对不同服务的请求可以并行执行。

我们还应该注意,服务的计数是动态的。

如下图所示

enter image description here

目前我有以下代码:

Flux.create((sink-> eventProvider.listen(new EventListner(){
public void event(req) {
sink.next(req);
}
})))
/* need some logic here */
.flatMap(req -> requestExecutor.execute(req))
.doOnNext(res -> responseProcessor.process(res))
.subscribe();

你有什么想法吗?

最佳答案

如果事件标识了它们发起调用的服务,您可以使用 groupBy() 运算符按服务分隔流。要在每个服务请求后引入延迟,请使用带有参数的 flatMap() 来单线程使用。

在 RxJava 中:

observable
.groupBy(event -> getServiceId( event )) // 1
.flatMap(serviceObservable -> // 2
serviceObservable // 3
.flatMap( ev -> service(serviceObservable.getKey(), ev), 1) // 4
.delay(1, TimeUnit.SECONDS)) // 5
.subscribe();
  1. 按事件将使用的服务对事件进行分组。该 ID 稍后将作为 key 使用。当遇到新的服务 ID 时,这将发出新的项目。
  2. serviceObservable 是一个 GroupByObservable,将在下面进行处理。
  3. 此可观察对象的每次发射都是一个应该发送到单个服务的事件。
  4. serviceObservable.getKey() 返回要使用的服务的 ID。我发明了一种方法 service(),它通过服务的 ID 向服务发送事件。此外,参数 1 告诉 flatMap() 单线程操作,因此一次只能发生一个服务请求。
  5. delay()(或您想要的任何运算符)将在释放操作之前等待一秒钟。

(免责声明:此代码未经测试,但我在过去的项目中做过类似的调度,因此基本思想是合理的。)

关于java - Project Reactor(或 RxJava2)执行程序序列调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53519820/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com