gpt4 book ai didi

java - Reactor 中 `groupBy` 组的并行调度

转载 作者:搜寻专家 更新时间:2023-11-01 03:20:45 24 4
gpt4 key购买 nike

我在学习Reactor ,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与特定实体相关联,并包含一些数据。

interface Message {
String getEntityId();
Data getData();
}

与不同实体相关的消息可以并行处理。但是,属于任何单个实体的消息必须一次处理一个,即实体 "abc" 的消息 2 的处理要等到实体 "abc"< 的消息 1 处理后才能开始 已经完成。在处理消息时,应该缓冲该实体的其他消息。其他实体的消息可以畅通无阻。可以把它想象成每个实体都在一个线程上运行这样的代码:

public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);

// Blocks until processing is finished
processMessage(msg);
}
}

我如何在不阻塞的情况下使用 React 实现这一点?总消息速率可能很高,但每个实体的消息速率将非常低。实体集可能非常大,并且不一定事先知道。

我猜它可能看起来像这样,但我不知道。

{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }

最佳答案

ProjectReactor你可以这样解决:

@Test
public void testMessages() {
Flux.fromStream(incomingMessages())
.groupBy(Message::getEntityId)
.map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
.subscribe( //create consumer for main stream
stream ->
stream.subscribe(this::processMessage) // create consumer for group stream
);
}

public Stream<Message> incomingMessages() {
return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
private final int id;
private final int entityId;

public Message(int id, int entityId) {
this.id = id;
this.entityId = entityId;
}

public int getId() {
return id;
}

public int getEntityId() {
return entityId;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", entityId=" + entityId +
'}';
}
}

我认为类似的解决方案可能在 RxJava

关于java - Reactor 中 `groupBy` 组的并行调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31185129/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com