gpt4 book ai didi

node.js - NestJs EventBus 在 EventHandler 复制事件

转载 作者:行者123 更新时间:2023-12-04 03:56:13 24 4
gpt4 key购买 nike

我正在尝试使用 NestJs 的事件溯源和 CQRS,并使用 Kafka 作为事件存储。

该应用程序是一个小而简单的应用程序,包含 2 个部分:客户和订单。您首先创建一个具有一些初始余额的客户,然后使用您创建订单的客户 ID,如果订单金额小于余额,则该订单将被批准,否则将被拒绝。

这里是有问题的代码:https://github.com/Ashniu123/nestjs-customer-order-eventsourcing-cqrs

我使用 KafkaJs 作为 EventBus(在 libs/ 下创建了我自己的 KafkaModule)

当我使用 Kafka 和 MongoDB 运行它时,应用程序启动得很好。当我也创建客户时,事件 CreateCustomerEvent 按预期发布并由 CommandHandler 推送到 Kafka。 (使用 landoop UI 检查)

当从 Kafka 读取事件并将其推送到 EventBus 以供 EventHandler 获取和执行时,问题就出现了。喜欢CreateCustomerEventHandler .

我的 EventBus 使用 Kafka 的配置在每个服务的 AppModule 中。例如,Customer .

并且 EventBus observable subject$ 是为 KafkaService 中的事件配置的.

这是应用程序日志(为我的评论添加//)。

customer-svc(命令端)

[Nest] 657306   - 09/13/2020, 12:54:47 AM   [CreateCustomerCommandHandler] Running command handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657306 - 09/13/2020, 12:54:47 AM [KafkaService] Published event: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}

customer-view-svc(查询/查看端)

[Nest] 657550   - 09/13/2020, 12:54:47 AM   [KafkaService] Bridged event payload value: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867a","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$NzEnAHRsfh/7QnczB3p/MepPl0fD44G/6sFtzKsjpwudjYlNjGacG","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$NzEnAHRsfh/7QnczB3p/Me"}
[Nest] 657550 - 09/13/2020, 12:54:48 AM [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867b","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$w0.mShhI3cMys7XAPLHRFusy63Fqlzj9s95JuSGdDpy.g5n5nt/8O","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$w0.mShhI3cMys7XAPLHRFu"}
// for some reason the another customer of same email is created even though in `customer.schema.ts` I have specified that it should be unique (not a priority at the moment)

我可以从日志中推断出,Kafka 事件仅按预期被消费者接收到一次,但使用 subject$.next 移动到 EventHandler 两次.

另外,需要澄清的是,事件被推送到 EventHandler 两次,正如创建时 customer._id 的不同值所暗示的那样。

使用调试器我可以看到 subject.observers 在类 FilterSubscriber 的数组中有 2 个值。我不知道这是否有用,只是想安排我自己解决这个问题的努力,在 6 小时无所事事之后,我来这里寻求帮助:)。

如果你们能更好地使用它,我已经在 repo 中添加了 launch.json 以与 VSCode 一起使用。只需使用正在运行的应用程序的 processId 进行附加。

附言我以类似的方式配置了 customer-view-svcorder-view-svc 的 EventBus,但两者都存在问题(即重复事件)。我希望你们能够帮助我解决这个问题。

谢谢。

最佳答案

CQRS 模块通过查看 providers 列表自动注册 EventHandlers。通过使用 EventBus.register(),我们可以添加额外的订阅。

commit解决了这个问题。

通过从 EventBus.register() 中删除 EventHandlers 列表,我只能订阅一次,从而解决了重复消息问题。

关于node.js - NestJs EventBus 在 EventHandler 复制事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63864255/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com