- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 NestJs 的事件溯源和 CQRS,并使用 Kafka 作为事件存储。
该应用程序是一个小而简单的应用程序,包含 2 个部分:客户和订单。您首先创建一个具有一些初始余额的客户,然后使用您创建订单的客户 ID,如果订单金额小于余额,则该订单将被批准,否则将被拒绝。
这里是有问题的代码:https://github.com/Ashniu123/nestjs-customer-order-eventsourcing-cqrs
我使用 KafkaJs 作为 EventBus(在 libs/
下创建了我自己的 KafkaModule)
当我使用 Kafka 和 MongoDB 运行它时,应用程序启动得很好。当我也创建客户时,事件 CreateCustomerEvent
按预期发布并由 CommandHandler 推送到 Kafka。 (使用 landoop UI 检查)
当从 Kafka 读取事件并将其推送到 EventBus 以供 EventHandler
获取和执行时,问题就出现了。喜欢CreateCustomerEventHandler .
我的 EventBus 使用 Kafka 的配置在每个服务的 AppModule 中。例如,Customer .
并且 EventBus observable subject$
是为 KafkaService 中的事件配置的.
这是应用程序日志(为我的评论添加//)。
customer-svc(命令端)
[Nest] 657306 - 09/13/2020, 12:54:47 AM [CreateCustomerCommandHandler] Running command handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657306 - 09/13/2020, 12:54:47 AM [KafkaService] Published event: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
customer-view-svc(查询/查看端)
[Nest] 657550 - 09/13/2020, 12:54:47 AM [KafkaService] Bridged event payload value: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867a","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$NzEnAHRsfh/7QnczB3p/MepPl0fD44G/6sFtzKsjpwudjYlNjGacG","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$NzEnAHRsfh/7QnczB3p/Me"}
[Nest] 657550 - 09/13/2020, 12:54:48 AM [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867b","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$w0.mShhI3cMys7XAPLHRFusy63Fqlzj9s95JuSGdDpy.g5n5nt/8O","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$w0.mShhI3cMys7XAPLHRFu"}
// for some reason the another customer of same email is created even though in `customer.schema.ts` I have specified that it should be unique (not a priority at the moment)
我可以从日志中推断出,Kafka 事件仅按预期被消费者接收到一次,但使用 subject$.next
移动到 EventHandler
两次.
另外,需要澄清的是,事件被推送到 EventHandler 两次,正如创建时 customer._id 的不同值所暗示的那样。
使用调试器我可以看到 subject.observers
在类 FilterSubscriber
的数组中有 2 个值。我不知道这是否有用,只是想安排我自己解决这个问题的努力,在 6 小时无所事事之后,我来这里寻求帮助:)。
如果你们能更好地使用它,我已经在 repo 中添加了 launch.json 以与 VSCode 一起使用。只需使用正在运行的应用程序的 processId 进行附加。
附言我以类似的方式配置了 customer-view-svc
和 order-view-svc
的 EventBus,但两者都存在问题(即重复事件)。我希望你们能够帮助我解决这个问题。
谢谢。
最佳答案
CQRS 模块通过查看 providers
列表自动注册 EventHandlers。通过使用 EventBus.register()
,我们可以添加额外的订阅。
这commit解决了这个问题。
通过从 EventBus.register()
中删除 EventHandlers 列表,我只能订阅一次,从而解决了重复消息问题。
关于node.js - NestJs EventBus 在 EventHandler 复制事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63864255/
我在开发应用程序时将配置保存在 .env 文件中。 这是我的 app.module.ts: @Module({ imports: [ ConfigModule.forRoot({ isGl
我正在使用我购买的 akveo 后端包,虽然在开发模式下一切似乎都在生产中运行良好,但出现了以下错误,但我对 nestjs 本身并不熟悉。 有谁知道这里发生了什么? node_modules/@nes
我刚刚开始使用 Nestjs我想知道如何使用路由前缀或通过 Express Router 实例对我的 API 进行版本控制? 理想情况下,我希望通过以下方式访问端点: /v1 /v2 等等,这样我就可
我想了解将服务提供商注入(inject) NestJS Controller 的目的是什么?这里的文档在这里解释了如何使用它们,这不是这里的问题:https://docs.nestjs.com/pro
我正在使用@goevelup/nestjs-rabbitmq库构建一个NestJS应用程序,以便将消息发布到rabbitmq交易所。。我正在AppModule中导入和配置RabbitMQ模块(这部分似
我正在使用@goevelup/nestjs-rabbitmq库构建一个NestJS应用程序,以便将消息发布到rabbitmq交易所。。我正在AppModule中导入和配置RabbitMQ模块(这部分似
我正在制作 @nestjs/swagger生成api文档。但是如何为经过身份验证的路由生成文档? 嵌套版本 λ nest i NodeJS Version : v10.16.0 [Nest Infor
NestJs 允许导出模块和提供者。它们有什么区别? 例子: // Reusable module @Module({ providers: [ServiceA], exports: [Service
我有一个返回字符串的 Controller 处理程序。 // Controller.ts import { Controller, Get, UseInterceptors } from '@nest
在 NestJS API 上,我想在实体中使用模块服务,以使用非数据库属性填充该实体。 在我的例子中,我想获得我正在检索的类别的文章数量。 @Entity({ name: 'categories' }
我正在设置一个新的 NestJS 应用程序,我刚刚添加了类验证器以验证 Controller 输入,但它似乎被完全忽略了。这是 DTO: import {IsString} from 'class-v
我想要一些环境,比如说development , production , test .这些环境应该是独立的,并使用它们自己的配置参数集,例如对于 DB、SERVER_PORT、USER 等。 它们不
我觉得 this thread 和 this thread 的组合是我需要实现的,我无法将它们绘制在一起。 我有一个包含 enum 的 DTO。 使用 Postman,我发送 PurchasableT
我是 NestJs 的新手,我创建了一个回退异常过滤器,现在我想知道如何使用它。换言之,如何将其导入我的应用程序? 这是我的后备异常过滤器: @Catch(HttpException) export
我在oracle数据库中有存储过程,我想在NestJs中调用它。 如何在 NestJs 中调用存储过程? 这是我的存储过程 PROCEDURE pipeline_critical (
我想在 nestjs 的验证中使用正则表达式。 例如: 正则表达式 pagePattern = '[a-z0-9\-]+'; 方法 @Get('/:article') getIndex(
我想将配置字符串传递给管道,但也想注入(inject)服务。 NesJs 文档描述了如何相互独立而不是一起执行这两项操作。举个例子: 管道.ts @Injectable() export class
我正在尝试对具有来自 nestjs 护照模块的 AuthGuard 的路由进行端到端测试,但我真的不知道如何处理它。当我运行测试时,它说: [ExceptionHandler] Unknown aut
我正在编写一个 NestJS 应用程序。一些端点支持排序,例如http://127.0.0.1:3000/api/v1/members?sort=-id&take=100 这意味着按 id 降序排序。
我想在 nestjs 的验证中使用正则表达式。 例如: 正则表达式 pagePattern = '[a-z0-9\-]+'; 方法 @Get('/:article') getIndex(
我是一名优秀的程序员,十分优秀!