我读了一些关于使用kafka和kafka流(带有状态存储)作为事件存储实现的文章。
https://www.confluent.io/blog/event-sourcing-using-apache-kafka/
https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/
实施思路如下:
在Kafka主题中存储实体更改(事件)
将kafka streams与state store一起使用(默认情况下使用db)来更新和缓存实体快照
每当执行新命令时,从存储中获取实体并对其执行操作,然后继续执行步骤#1
此工作流的问题是,状态存储正在异步更新(步骤2),并且在处理新命令时,检索到的实体快照可能已过时(因为它没有使用以前命令中的事件进行更新)。
我的理解正确吗?有没有一个简单的方法来处理Kafka这样的案件?
3条答案
按热度按时间xyhw6mcr1#
我的理解正确吗?
据我所知,是的,这意味着对于许多事件源域模型来说,它是一个不令人满意的事件存储。
简而言之,在向主题添加事件时不支持“第一作者获胜”,这意味着Kafka不能帮助您确保主题满足其不变量。
有人提出了解决这个问题的建议,但我还没有找到进展的证据。
https://issues.apache.org/jira/browse/kafka-2260
https://cwiki.apache.org/confluence/display/kafka/kip-27+-+conditional+publish
webghufk2#
是的,这很简单。
使用Kafka信息的密钥。具有相同密钥的消息总是*进入相同的分区。一个使用者可以读取一个或多个部分,但两个使用者不能同时读取两个分区。
工作使用者的最大计数总是<=主题的分区计数。您可以创建更多使用者,但使用者将是备份节点。
dldeef673#
如果您向kafka写入命令,然后在kstreams中具体化视图,则具体化视图将异步更新。这有助于将写操作与读操作分开,以便读取路径可以扩展。
如果您希望命令/事件具有一致的读写语义,那么最好将其写入数据库。事件可以使用cdc连接器(write-through)从数据库提取到kafka,也可以先写入数据库,然后在事务中写入kafka(write-aside)。
另一个选项是对读取执行长轮询(因此,如果您写入trade1.version2,然后希望再次读取它,则读取将阻塞,直到trade1.version2可用)。这并不适用于所有用例,但它可能很有用。
示例如下:https://github.com/confluentinc/kafka-streams-examples/blob/4eb3aa4cc9481562749984760de159b68c922a8f/src/main/java/io/confluent/examples/streams/microservices/ordersservice.java#l165