gpt4 book ai didi

cqrs - 轴突框架 : Handle only events published by the same JVM instance?

转载 作者:行者123 更新时间:2023-12-04 09:13:10 26 4
gpt4 key购买 nike

嗨 Axon 框架社区,
我想就如何正确解决以下问题征求您的意见。
我的 Axon 测试设置

  • 同一个 Spring Boot 应用程序的两个实例(使用 axon-spring-boot-starter 4.4,没有 Axon Server)
  • 每个实例定期发布相同的事件
  • 两个实例都连接到同一个 EventSource(使用 JpaEventStorageEngine 的单个 SQL Server 实例)
  • 每个实例都配置为使用 TrackingEventProcessors
  • 每个实例都注册了相同的事件处理程序

  • 我想达到什么
    我希望由一个实例发布的事件仅由同一个实例处理

    If instance1 publishes eventX then only instance1 should handle eventX


    到目前为止我尝试过的
  • 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,在我的情况下这不是一个选项,因为我们希望可以选择重放事件以重建/添加新的查询模型。
  • 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是,这没有奏效。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ? - 虽然不太确定。
  • 我可以实现一个 MessageHandlerInterceptor,它只有在事件源来自同一个实例的情况下才会继续。这是我到目前为止实现的并且可以正常工作:
    MessageHandlerInterceptor

  • class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
    val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
    if(stackId == stackProperties.id){
    interceptorChain?.proceed()
    }
    return null
    }
    }
    @Configuration
    class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
    val processingGroup = "processing-group-stack-${stackProperties.id}"
    eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
    eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
    }
    有更好的解决方案吗?
    我的印象是我当前的解决方案并不是最好的解决方案,因为理想情况下我希望只有属于某个实例的事件处理程序由 TrackingEventProcessor 实例触发。
    你会怎么解决?

    最佳答案

    有趣的场景你在这里@thowimmer。
    我的第一个预感是说“改用 SubscribingEventProcessor”。
    但是,您指出这不是您的设置中的一个选项。
    我认为对于处于相同情况的其他人来说,知道为什么这不是一种选择非常有值(value)。所以,也许你可以详细说明一下(说实话,我也很好奇)。
    现在对于您的问题案例,以确保事件仅在同一个 JVM 中处理。
    将源添加到事件肯定是 您可以采取的步骤,因为这允许以合乎逻辑的方式进行过滤。 “此事件是否源自 my.origin()?”如果没有,您只需忽略该事件并完成它,就这么简单。不过,还有另一种方法可以实现这一点,我稍后会谈到。
    然而,我认为过滤的地方是你最想要的。但首先,我想说明您首先需要过滤的原因。正如您所注意到的,TrackingEventProcessor (TEP) 来自所谓的 StreamableMessageSource 的流事件. EventStore是这样一个 StreamableMessageSource 的实现.当您将所有事件存储在同一个存储中时,它只会将所有事件流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段对其进行过滤。使用 MessageHandlerInterceptor行,你甚至可以去写一个 HandlerEnhacnerDefinition允许您向事件处理函数添加其他行为。不管你怎么说,在当前的设置下,过滤需要在某处完成。 MessageHandlerInterceptor可以说是最简单的地方来做到这一点。
    但是,有一种不同的方法来处理这个问题。为什么不将您的 Event Store 隔离为两个应用程序的两个不同实例?显然他们不需要相互阅读,那么为什么要共享同一个 Event Store?在不了解您的域的进一步背景的情况下,我猜您基本上是在处理驻留在不同 bounded contexts 中的应用程序。 .简而言之,与两个应用程序/上下文共享所有内容的兴趣为零,您只需非常有意识地彼此共享您的领域语言的特定部分。
    请注意,支持 multiple contexts ,在中间使用单个通信集线器,正是 Axon Server 可以为您实现的。我不是在这里说你不能自己配置这个,我过去已经这样做了。但是将这项工作留给其他人或其他人,使您无需配置基础设施,这将节省大量时间。
    希望这可以帮助您设定我对此事的一些想法@thowimmer。

    关于cqrs - 轴突框架 : Handle only events published by the same JVM instance?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63313532/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com