gpt4 book ai didi

multithreading - 在 Spring 中使用什么样的 "EventBus"?内置、Reactor、Akka?

转载 作者:IT老高 更新时间:2023-10-28 13:05:01 26 4
gpt4 key购买 nike

我们将在几周后启动一个新的 Spring 4 应用程序。我们想使用一些事件驱动的架构。今年我到处读到关于“Reactor”的文章,在网上寻找它时,我偶然发现了“Akka”。

所以现在我们有 3 个选择:

我找不到真正的比较。


现在我们只需要这样的东西:

  • X 注册监听 Event E
  • Y 注册监听 Event E
  • Z 发送 Event E

然后XY将接收并处理该事件。

我们很可能会以异步方式使用它,但肯定也会有一些同步场景。我们很可能总是发送一个类作为事件。 (Reactor 示例主要使用字符串和字符串模式,但它也支持对象)。


据我了解,ApplicationEvent 默认同步工作,Reactor 工作异步。 Reactor 还允许使用 await() 方法使其有点同步。 Akka 提供或多或少与 Reactor 相同的功能,但也支持 Remoting。

关于Reactor的await()方法:可以等待多个线程完成吗?或者甚至可能是这些线程的一部分?如果我们以上面的例子为例:

  • X 注册监听 Event E
  • Y 注册监听 Event E
  • Z 发送 Event E

是否可以通过说:WAITINGX Y完成。是否有可能让它只等待X,而不等待Y


也许还有一些替代品?例如 JMS 呢?

很多问题,但希望您能提供一些答案!

谢谢!


编辑:示例用例

  1. 当某个特定事件被触发时,我想创建 10000 封电子邮件。每封电子邮件都必须使用用户特定的内容生成。所以我会创建很多线程(max = system cpu cores)来创建邮件并且不会阻塞调用者线程,因为这可能需要几分钟。

  2. 当特定事件被触发时,我想从未知数量的服务中收集信息。每次获取大约需要 100 毫秒。在这里我可以想象使用 Reactor 的 await,因为我需要这些信息来继续我在主线程中的工作。

  3. 当特定事件被触发时,我想根据应用程序配置执行一些操作。因此应用程序必须能够动态(取消)注册消费者/事件处理程序。他们会用事件做他们自己的事情,我不在乎。所以我会为每个处理程序创建一个线程,然后继续在主线程中工作。

  4. 简单的解耦:我基本上知道所有的接收者,但我只是不想在我的代码中调用每个接收者。这应该主要是同步完成的。

听起来我需要一个 ThreadPool 或一个 RingBuffer。这些框架是否有动态的 RingBuffers,如果需要,它的大小会增加?

最佳答案

我不确定我能否在这个狭小的空间内充分回答您的问题。但我会试一试! :)

Spring 的 ApplicationEvent 系统和 Reactor 就功能而言确实截然不同。 ApplicationEvent 路由基于ApplicationListener 处理的类型。任何比这更复杂的事情,您都必须自己实现逻辑(但这不一定是坏事)。然而,Reactor 提供了一个全面的路由层,它也非常轻量级且完全可扩展。两者在功能上的任何相似之处都在于它们订阅和发布事件的能力,这实际上是任何事件驱动系统的特性。另外不要忘记 Spring 4 中的新 spring-messaging 模块。它是 Spring Integration 中可用工具的子集,还提供了围绕事件驱动架构构建的抽象。

Reactor 将帮助您解决几个关键问题,否则您必须自己管理:

Selector 匹配:Reactor 进行 Selector 匹配,它包含一系列匹配——来自一个简单的 .equals(Object other) 调用, 到允许占位符提取的更复杂的 URI 模板匹配。您还可以使用自己的自定义逻辑扩展内置选择器,这样您就可以使用丰富的对象作为通知键(例如域对象)。

Stream 和 Promise APIs:您提到 Promise API 已经引用了 .await() 方法,它实际上是用于期望阻塞行为的现有代码。在使用 Reactor 编写新代码时,使用组合和回调来通过不阻塞线程来有效利用系统资源的压力不会太大。在依赖少量线程来执行大量任务的架构中,阻塞调用者几乎从来都不是一个好主意。 Futures 根本不是云可扩展的,这就是现代应用程序利用替代解决方案的原因。

您的应用程序可以使用 Streams 或 Promises 中的任何一种来构建,但老实说,我认为您会发现 Stream 更加灵活。关键的好处是 API 的可组合性,它允许您在依赖链中将操作连接在一起而不会阻塞。作为一个基于您描述的电子邮件用例的完全即兴示例:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
.map(new Function<DomainObject, EmailTemplate>() {
public EmailTemplate apply(DomainObject in) {
// generate the email
return new EmailTemplate(in);
}
})
.consume(new Consumer<EmailTemplate>() {
public void accept(EmailTemplate email) {
// send the email
client.send(email);
}
});

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
input.accept(obj);
}

Reactor 还提供 Boundary这基本上是一个 CountDownLatch 用于阻止任意消费者(因此,如果您只想阻止 Consumer<,则不必构造 Promise/完成)。在这种情况下,您可以使用原始 Reactor 并使用 on()notify() 方法来触发服务状态检查。

但是,对于某些事情,您想要的是从 ExecutorService 返回的 Future,不是吗?为什么不把事情简单化?只有在吞吐量性能和开销效率很重要的情况下,Reactor 才会真正受益。如果您阻塞了调用线程,那么您很可能会抹去 Reactor 无论如何都会给您带来的效率提升,因此在这种情况下使用更传统的工具集可能会更好。

Reactor 的开放性的好处在于,没有什么可以阻止两者的交互。您可以自由地将 FuturesConsumers 混合使用,无需静态。在这种情况下,请记住,您的速度只会与最慢的组件一样快。

关于multithreading - 在 Spring 中使用什么样的 "EventBus"?内置、Reactor、Akka?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20663988/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com