实现outbox pattern的通常方法是将消息有效负载存储在发件箱表中,并使用一个单独的进程(Message Relay)查询待处理消息,然后将它们发布到消息代理(在我的例子中是Kafka)中。
发件箱表的状态可能如下所示。
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
My Message Relay是一个Sping Boot /Cloud Stream应用程序,它定期(@Scheduled
)查找PENDING记录,将其发布到Kafka中,并将记录更新为PROCESSED状态。
第一个问题是:如果我启动了多个消息中继示例,所有的示例都会查询发件箱表,并且可能在某个时候不同的示例会得到相同的PENDING注册表来发布到Kafka中,从而产生重复的消息.我如何防止这种情况?
另一种情况:假设只有一个消息中继。它得到一个PENDING记录,将其发布到主题,但在将记录更新为PROCESSED之前崩溃。当它再次启动时,它会找到相同的PENDING记录并再次发布它。是否有办法避免这种重复,或者唯一的办法是设计一个幂等系统。
4条答案
按热度按时间eit6fx6z1#
为了防止第一个问题,您必须使用数据库锁定。
这将防止其他进程访问同一行。
第二个问题你解决不了,因为你和Kafka没有分布式交易。
因此,一种方法可以是在将记录发送给Kafka之前将其设置为类似于PROCESSING的状态,如果应用程序崩溃,则应检查是否有处于PROCESSING状态的记录,并执行一些清理任务,以确定它们是否已发送给Kafka。
但最好的解决方案是有一个可以处理重复的幂等系统。
yzckvree2#
您可以使用
debezium
(https://debezium.io/)来读取SQL服务器的bin-log并将事件写入Kafka,它将解决您的两个问题。cclgggtu3#
对于第一个问题,您可以使用ShedLock library,它可以确保在任何时候,只有一个服务示例在执行调度任务。
对于第二个问题,是的,你必须开发一个幂等的消费者,你可以通过传递消息id给消费者来实现,并维护一个表来检查消息id是否已经被处理,忽略它。
7d7tgy0s4#
你也可以使用分区键将你的表分割成多个分区,然后给每个消息中继分配一个分区键,这样他们就可以用它来过滤记录,这是一个类似于分片的方法,但是很简单。