我正在构建一个由几个微服务组成的系统。他们将遵循cqrs、es和ddd协议。我想用Apache·Kafka作为“真相的来源”——正如杰伊·克雷普斯在confluent和linkedin的工程博客上的许多材料中所说的那样。
我的问题和主要问题是:
当apachekafka是真相的来源时,如何为新的实体生成身份?
例子:
我有一个订单(在网上商店)。因为Kafka是我的真理之源,所以我想先把数据放到Kafka上,然后用Kafka的数据填充一些数据库,比如mysql或elastic search。当用户下新订单时,我会在日志中添加一个“neworder”事件,其中包含订单的详细信息(已订购的商品和数量、客户数据、发货地址等)。我希望订单有一个id,这样我就可以在mysql和ElasticSearch中填充数据时使用它。
你知道一些技巧和最佳实践吗?我怎样才能给订单分配一个id(身份)?
当使用mysql作为真相的来源时,这将是很容易的。在这种情况下,我会有一些“id”列,mysql会在其中分配一个id整数。
编辑
我知道guid的概念,但我正在寻找一些其他的概念。
2条答案
按热度按时间trnvg8h31#
@我认为你应该使用一个独立的id机制(像guid一样简单)。从系统的Angular 来看,id的“外观”应该并不重要。确保您选择的是跨平台的(它将与mysql、mssql、elasticsearch一起使用)。
如果您计划在restfulwebservice中使用id,并且希望它看起来更“自然”,那么您也可以创建自己的id系统。如果你使用6个字符(字母数字),你有数百万的可能性。只需创建一个为您执行此操作的服务。
identityservice.newid()=>“x4er4t”。这样,您就可以在restfulapi中使用它,感觉“自然”。获取api/order/x4er4t
另外,您可以在生成验证规则时创建自己的验证规则。您可以随机或按顺序生成它们。您可以在内存中存储一批尚未使用的id,并加快处理速度(避免往返数据库以检查可用的最新id)。
我为一家保险公司做了这件事(和你的情况类似),他们非常高兴。
7jmck4yq2#
有一个“简单”的解决方案,但正如你将看到的,它在实践中变得更加复杂。
kafka中的每条消息都由代理分配唯一的偏移量id(64位长)。它是一种类似sql的序列,随消息单调增加,总是与实际负载(key/message)一起发送到客户端。它是kafka协议的核心(客户端通过向代理发送最后一次看到的偏移量来保持轮询数据),因此它不会随着新版本的出现而消失。
只要您有一个永远不会失败的分区,它就是解决您问题的完美解决方案—有序的人工密钥,可以放入单个db列中,如果您重放kafka流(然后您可以将其与数据库协调,执行upserts,或者在pk冲突时失败),这些密钥将完全按照预期重新发送。你可能真的不想在pk复制上失败,因为在应用程序崩溃的情况下,kafka会重新向你发送部分已经看到的消息,所以某种upsert/协调更好。在任何情况下,它应该是工作没有问题。
不止一个分区会让事情变得更复杂(这在Kafka身上很常见)。偏移量仅在单个分区的上下文中是唯一的,分区之间没有数字关系-因此id 1000分区0可能比id 5000分区1“晚”很多(“晚”用引号括起来,因为当您以适当的方式考虑分区时,您不应该真的认为它们之间的事情是按时间排序的)。这意味着:
你需要用分区id来丰富你的主键,它看起来不再那么漂亮了
您失去了良好的视觉副作用,所有的命令被正确地按主键及时排序
只要你的Kafka集群没有灾难性的失败,这仍然可以正常工作。如果您需要对kafka/zookeeper环境执行完全干净的重新启动,则所有偏移都将重置为0。我不知道有什么方法可以让他们从更高的数字开始(有很多方法可以改变消费者补偿,但我没有发现任何方法可以降低生产者/经纪人的补偿)。在这一点上,您的整个逻辑都被破坏了,并且没有简单的方法可以从该状态中恢复(除了可能更改代码并执行类似于假设partitionid=partitionid+100之类的技巧,有效地添加主键的第三部分,即“generation id”)。
据我所知,假设Kafka永远不会以这种方式失败——如果配置得当,会出现复制、故障转移、滚动更新等。但你会把整个设计押在永远不会耗尽内存、磁盘空间的问题上吗,Kafka的新版本在从过时的旧格式更新时有什么问题?
你可能想和对kafka有更多经验的人谈谈-我们在开发过程中遇到过几次需要干净安装的重大问题(总是我们的错误-达到物理ram限制,让linux杀死随机的东西,磁盘空间不足,或者破坏docker启动/装载)。可能,通过更多的努力,在任何情况下恢复旧状态都是可能的,我们只是通过重置所有内容(这是dev,我们不依赖于外部的偏移量)来执行廉价的路由。