nestjs eventbus在eventhandler上复制事件

imzjd6km  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(476)

我正在尝试事件源和cqrs使用nestjs使用kafka作为事件存储。
该应用程序是一个小而简单的一个有两个部分,客户和订单。您首先创建一个具有一些初始余额的客户,然后使用客户id创建一个订单,如果订单的金额小于余额,那么它将被批准,否则将被拒绝。
以下是相关代码:https://github.com/ashniu123/nestjs-customer-order-eventsourcing-cqrs
我使用kafkajs作为eventbus(在下面创建了我自己的kafkamodule) libs/ )
当我用kafka和mongodb运行这个程序时,应用程序启动得很好。当我也创建了一个客户时 CreateCustomerEvent 按预期发布,并由commandhandler推送到kafka上(使用landoop ui进行检查)
当从kafka读取事件并将其推送到eventbus上由 EventHandler . 比如createcustomereventhandler。
我对eventbus使用kafka的配置在appmodule中。例如,客户。
以及可观察到的事件总线 subject$ 为kafkaservice中的事件配置。
下面是应用程序日志(添加//供我评论)。
客户svc(命令端)

[Nest] 657306   - 09/13/2020, 12:54:47 AM   [CreateCustomerCommandHandler] Running command handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657306   - 09/13/2020, 12:54:47 AM   [KafkaService] Published event: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}

客户视图svc(查询/视图侧)

[Nest] 657550   - 09/13/2020, 12:54:47 AM   [KafkaService] Bridged event payload value: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
[Nest] 657550   - 09/13/2020, 12:54:47 AM   [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550   - 09/13/2020, 12:54:47 AM   [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550   - 09/13/2020, 12:54:47 AM   [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867a","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$NzEnAHRsfh/7QnczB3p/MepPl0fD44G/6sFtzKsjpwudjYlNjGacG","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$NzEnAHRsfh/7QnczB3p/Me"}
[Nest] 657550   - 09/13/2020, 12:54:48 AM   [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867b","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$w0.mShhI3cMys7XAPLHRFusy63Fqlzj9s95JuSGdDpy.g5n5nt/8O","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$w0.mShhI3cMys7XAPLHRFu"}
// for some reason the another customer of same email is created even though in `customer.schema.ts` I have specified that it should be unique (not a priority at the moment)

我可以从日志中推断出,kafka事件只被使用者按预期接收一次,但是使用 subject$.nextEventHandler 两次。
另外,要澄清的是,根据customer.\u id的不同值的建议,事件被两次推送到eventhandler。
使用调试器,我可以看到 subject.observers 在类的数组中有2个值 FilterSubscriber . 我不知道这是否有用,但我只是想安排我的努力,以解决这个问题自己,6小时后,我来这里寻求帮助:)。
我已经添加了launch.json来与repo中的vscode一起使用,如果你们能更好地使用它的话。只需使用正在运行的应用程序的processid进行附加。
p、 我已经配置了两者的eventbus customer-view-svc 以及 order-view-svc 以类似的方式,问题同时存在(即重复事件)。我希望你们能帮我解决这个问题。
谢谢。

s8vozzvw

s8vozzvw1#

cqrs模块通过查看 providers 列表。通过使用 EventBus.register() 我们可以添加其他订阅。
此提交解决了问题。
通过从 EventBus.register() ,我只能订阅它们一次,因此解决了重复消息的问题。

相关问题