kafka streams的活动采购

raogr8fs  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(348)

我试图在kafka流之上实现一个简单的cqrs/事件源概念验证(如中所述)https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)
我有4个基本部分: commands 主题,该主题使用聚合id作为对每个聚合的命令进行顺序处理的键 events 主题,聚合状态中的每个更改都将发布到该主题(同样,键是聚合id)。此主题的保留策略为“从不删除”
一种减少聚合状态并将其保存到状态存储中的表

events topic stream ->
group to a Ktable by aggregate ID ->
reduce aggregate events to current state ->
materialize as a state store

命令处理器-命令流,与聚合状态ktable左连接。对于结果流中的每个条目,使用函数 (command, state) => events 生成结果事件并将其发布到 events 主题
问题是-有没有办法确保我在状态存储中有最新版本的聚合?
如果违反业务规则,我想拒绝命令(例如,如果实体被标记为已删除,则修改实体的命令无效)。但如果 DeleteCommand 发布之后是 ModifyCommand 紧接着,delete命令将生成 DeletedEvent 但是当 ModifyCommand 如果已处理,则从状态存储加载的状态可能不会反映该状态,并且将发布冲突事件。
我不介意牺牲命令处理吞吐量,我宁愿得到一致性保证(因为所有内容都是由同一个键分组的,应该在同一个分区中结束)
希望很清楚:)有什么建议吗?

dba5bblo

dba5bblo1#

请阅读我同事杰斯珀的这篇文章。Kafka是一个伟大的产品,但实际上并不适合在所有的活动采购
https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c

ego6inou

ego6inou2#

我认为kafka还不适合cqr和事件源,正如您所描述的那样,因为它缺乏一种(简单的)防止并发写入的方法。本文对此进行了详细论述。
我的意思是,你所描述的是这样一个事实:你期望一个命令生成零个或多个事件,或者异常失败;这是具有事件源的经典cqrs。大多数人都期待这种建筑。
你可以有一个不同的风格的事件来源。您的命令处理程序可以为接收到的每个命令生成事件(即。 DeleteWasAccepted ). 然后,事件处理程序最终可以以事件源的方式(通过从其事件流重建聚合的状态)处理该事件,并发出其他事件(即。 ItemDeleted 或者 ItemDeletionWasRejected ). 因此,命令被触发并忽略,异步发送,客户机不等待立即响应。但是,它会等待一个描述其命令执行结果的事件。
一个重要的方面是,事件处理程序必须以串行方式(一次并按顺序)处理来自同一聚合的事件。这可以用一个Kafka消费群体来实现。你可以在这个视频中看到这个架构。

ilmyapht

ilmyapht3#

我提出的一个可能的解决方案是实现一种乐观的锁定机制:
添加 expectedVersion 命令上的字段
使用ktable Aggregator 为每个处理的事件增加聚合快照的版本
如果 expectedVersion 与快照的聚合版本不匹配
这似乎提供了我想要的语义

相关问题