- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我在学习Reactor ,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与特定实体相关联,并包含一些数据。
interface Message {
String getEntityId();
Data getData();
}
与不同实体相关的消息可以并行处理。但是,属于任何单个实体的消息必须一次处理一个,即实体 "abc"
的消息 2 的处理要等到实体 "abc"< 的消息 1 处理后才能开始
已经完成。在处理消息时,应该缓冲该实体的其他消息。其他实体的消息可以畅通无阻。可以把它想象成每个实体都在一个线程上运行这样的代码:
public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);
// Blocks until processing is finished
processMessage(msg);
}
}
我如何在不阻塞的情况下使用 React 实现这一点?总消息速率可能很高,但每个实体的消息速率将非常低。实体集可能非常大,并且不一定事先知道。
我猜它可能看起来像这样,但我不知道。
{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}
public static Stream<Message> incomingMessages() { /* ... */ }
最佳答案
与 ProjectReactor你可以这样解决:
@Test
public void testMessages() {
Flux.fromStream(incomingMessages())
.groupBy(Message::getEntityId)
.map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
.subscribe( //create consumer for main stream
stream ->
stream.subscribe(this::processMessage) // create consumer for group stream
);
}
public Stream<Message> incomingMessages() {
return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}
public void processMessage(Message message) {
System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}
private static class Message {
private final int id;
private final int entityId;
public Message(int id, int entityId) {
this.id = id;
this.entityId = entityId;
}
public int getId() {
return id;
}
public int getEntityId() {
return entityId;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", entityId=" + entityId +
'}';
}
}
我认为类似的解决方案可能在 RxJava 中
关于java - Reactor 中 `groupBy` 组的并行调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31185129/
我正在尝试找出我们应该在下一个项目中使用 Akka 还是 Reactor。最重要的问题之一是 future 选择的框架是否会提供远程处理。正如我所看到的,Akka 以我们想要的方式提供了这个。 在 G
假设我有一个返回 Mono 的 repository.save(..) 方法。 还可以说我有一个 repository.findByEmail(..) 它返回一个 Mono。 问题: 我希望第一个 M
有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理? 当然,不必添加一个未知持续时间的 sleep ...... @Test public void group
我创建了一个简单的 Kafka 消费者,它返回 Flux对象(收到的消息),我正在尝试使用 StepVerifier 对其进行测试. 在我的测试中,我做了这样的事情: Flux flux = cons
我目前正在研究resilience4j 库,由于某种原因,以下代码无法按预期工作: @Test public void testRateLimiterProjectReactor() { //
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 7 年前。 Improv
当多个 onErrorContinue添加到管道 处理从 flatMap 抛出的特定类型的异常 ,异常处理没有按预期工作。 我希望下面的代码应该删除元素 1 到 6,而订阅者应该使用元素 7 到 10
我有一个 NettyServerCustomizer,下一个代码: @Override public HttpServer apply(final HttpServer httpServer)
我们正在评估 reactor 库以便在我们的项目中使用它。我们的项目得到了 spring 上下文的支持。因此,我们需要一个工具来构建具有 spring 支持的事件驱动应用程序。 此外,我们的主要关注领
不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容: Flux someFastPublisher; so
我正在使用 Spring WebClient 调用休息服务。如下所述的 post 调用代码。 Mono response = client.post()
我希望在 react 器运行后添加更多协议(protocol)和工厂。我找不到说明这是允许的文档。当我在 reactor.connectTCP 之前运行 reactor.run 时,程序会在工厂中围绕
(译者加)本文档的一些典型的名词如下: Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅 n.)、subscribe(订阅 v.)。 event/signa
Project Reactor Mono 是否有运算符或一些好的方法来实现 doOnEmpty() 的行为? 我想对操作结果产生副作用(日志记录)。 这是我现在拥有的: myMono .map(v
在以下两个示例中,处理通量流的行为似乎有所不同。 示例 1: public static void main(String[] args) throws InterruptedException {
是否可以使用 LoggingMeterRegistry 以微米为单位收集项目 react 器的通量指标? 最佳答案 只需添加 LoggingMeterRegistry 就可以了到全局 Micromet
我想知道在spring web-flux中使用先前映射结果的好方法,例如 Mono.just(request) ... .flatMap(object0 -> createObject1(object
如何将具有1个元素的助焊剂转换为单声道? Flux.fromArray(arrayOf(1,2,1,1,1,2)) .distinct()
我想知道Reactor和分页的HTTP API。我有一个private fun getPage(pageNumber: Int): Mono。该资源具有“numberOfPages”字段,我想获取所有
我有一个包含多个 URL 和端口的数组。对于他们每个人,我需要发送和接收返回的内容: Flux.fromArray(trackersArray) .flatMap(tracker ->
我是一名优秀的程序员,十分优秀!