NestJs EventBus 在 EventHandler 复制事件

NestJs EventBus duplicates event at EventHandler

我正在尝试使用 NestJs 的事件溯源和 CQRS,使用 Kafka 作为事件存储。

该应用程序是一个小而简单的应用程序,有 2 个部分,客户和订单。您首先创建一个具有一些初始余额的客户,然后使用您创建订单的客户 ID,如果订单金额小于余额则批准,否则拒绝。

这里是有问题的代码:https://github.com/Ashniu123/nestjs-customer-order-eventsourcing-cqrs

我使用 KafkaJs 作为 EventBus(在 libs/ 下创建了我自己的 KafkaModule

当我 运行 使用 Kafka 和 MongoDB 时,应用程序启动正常。 当我也创建客户时,事件 CreateCustomerEvent 按预期发布并由 CommandHandler 推送到 Kafka。 (使用 landoop 检查 UI)

当从 Kafka 读取事件并将其推送到 EventBus 以供 EventHandler 拾取和执行时,问题就出现了。喜欢 CreateCustomerEventHandler.

我的 EventBus 使用 Kafka 的配置在每个服务的 AppModule 中。例如,Customer.

并且为 KafkaService 中的事件配置了 EventBus observable subject$

这是应用程序日志(为我的评论添加 //)。

customer-svc(命令端)

[Nest] 657306   - 09/13/2020, 12:54:47 AM   [CreateCustomerCommandHandler] Running command handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657306   - 09/13/2020, 12:54:47 AM   [KafkaService] Published event: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}

customer-view-svc(Query/View 方)

[Nest] 657550   - 09/13/2020, 12:54:47 AM   [KafkaService] Bridged event payload value: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
[Nest] 657550   - 09/13/2020, 12:54:47 AM   [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550   - 09/13/2020, 12:54:47 AM   [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550   - 09/13/2020, 12:54:47 AM   [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867a","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"b$NzEnAHRsfh/7QnczB3p/MepPl0fD44G/6sFtzKsjpwudjYlNjGacG","firstName":"john","lastName":"doe","balance":1000,"salt":"b$NzEnAHRsfh/7QnczB3p/Me"}
[Nest] 657550   - 09/13/2020, 12:54:48 AM   [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867b","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"b$w0.mShhI3cMys7XAPLHRFusy63Fqlzj9s95JuSGdDpy.g5n5nt/8O","firstName":"john","lastName":"doe","balance":1000,"salt":"b$w0.mShhI3cMys7XAPLHRFu"}
// for some reason the another customer of same email is created even though in `customer.schema.ts` I have specified that it should be unique (not a priority at the moment)

我可以从日志中推断出 Kafka 事件仅按预期被消费者接收到一次,但使用 subject$.next 移动到 EventHandler 两次。

此外,为了澄清,事件被推送到 EventHandler 两次,正如创建时 customer._id 的不同值所建议的那样。

使用调试器我可以看到 subject.observers 在 class FilterSubscriber 的数组中有 2 个值。我不知道这是否有用,只是想安排我自己解决这个问题的努力,在 6 小时无所事事之后,我来这里寻求帮助:)。

如果你们能更好地使用它,我已经在回购协议中添加了 launch.json 以与 VSCode 一起使用。只需使用 运行ning 应用程序的 processId 进行附加。

P.S。我以类似的方式配置了 customer-view-svcorder-view-svc 的 EventBus,并且两者都存在问题(即重复事件)。希望大家能帮我解决问题。

谢谢。

CQRS 模块通过查看 providers 列表自动注册 EventHandlers。通过使用 EventBus.register() 我们可以添加额外的订阅。

commit 解决了问题。

通过从 EventBus.register() 中删除 EventHandlers 列表,我只能订阅一次,从而解决了重复消息问题。