在Apache Storm中处理Kafka消息时如何确保恰好一次语义

rqenqsqc  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(262)

我在我的应用程序中需要一次传递。我探索Kafka,意识到要让消息只产生一次,我必须在生产者配置中设置idempotence=true。这也设置了acks=all,使生产者重新发送消息,直到所有副本都提交了它。为了确保消费者不会重复处理或留下任何未处理的消息,建议在同一个数据库事务中将处理输出和偏移量提交到外部数据库,以便两者都持久化或都不持久化,从而避免重复和不处理。
在使用者中,如果使用者首先提交消息,但在处理之前失败,则该消息仍处于处理状态,如果使用者首先处理消息,但在提交之前失败,则该消息将被多次处理

**Q1.**现在我在猜测如何用Apache Storm来模仿。我想通过在KafkaBolt中设置idemptence=true可以确保消息的一次生成。我说的对吗?

我在猜测如何确保Storm中的遗漏和重复消息处理。例如,this doc page表示,如果我锚一个元组(通过将其作为第一个参数传递给OutputCollector.emit()),然后将元组传递给OutputCollector.ack()OutputCollector.fail(),Storm将确保数据丢失。以下是它的确切含义:
现在,您已经了解了可靠性算法,让我们回顾一下所有故障案例,看看Storm如何在每种情况下避免数据丢失:

***由于任务终止,元组未被确认:**在这种情况下,失败元组的树根处的spout元组id将超时并被重放。
***阿克任务终止:**在这种情况下,Acker跟踪的所有spout元组将超时并被重放。
***Spout任务终止:**在这种情况下,与Spout对话的源负责重放消息。例如,当客户端断开连接时,像Kestrel和RabbitMQ这样的队列会将所有挂起的消息放回队列中。
**Q2.**我想这可以确保消息不会被保留为未处理状态,但无法避免消息的重复处理。我说的对吗?Storm是否还提供了其他东西来确保我所缺少的像Kafka一样的一次性语义?

ncgqoxb0

ncgqoxb01#

关于Q1:是的,您可以通过设置该属性从KafkaBolt获得相同的行为,KafkaBolt只是简单地 Package 了一个KafkaProducer
关于消费端的语义,Storm和Kafka具有相同的选择。(例如,写入数据库)。如果您在此之前执行此操作,则程序崩溃,您将丢失消息。我们将其命名为at-most-once processing。如果您在此之后执行此操作,如果程序在处理之后但在提交之前崩溃(称为at-least-once processing),则有可能处理同一消息两次。
因此,关于Q2:是的,使用锚定元组和acking将为您提供at-least-once语义。不使用锚定元组将为您提供at-most-once
是的,Storm还提供了一种名为Trident的工具来确保恰好一次语义,但它要求您以不同的方式编写拓扑,并且您的数据存储必须适应它,以便可以进行消息重复数据删除。请参阅https://storm.apache.org/releases/2.0.0/Trident-tutorial.html上的文档。
也只是提醒你一句:当Storm的文档(或Kafka)谈到“恰好一次”语义时,对于您将执行的处理类型,存在一些假设。例如,当Storm的Trident文档谈到“恰好一次”时,假设您将调整您的数据库,以便在给出消息时可以决定该消息是否已被存储。当Kafka的文档谈到"恰好一次“时,假设你的处理过程是阅读Kafka的作品,做一些计算(很可能没有副作用),然后写回Kafka。
这只是说,对于某些类型的处理,您可能仍然需要在at-least-onceat-most-once之间进行选择。如果您可以使您的处理幂等,at-least-once是一个很好的选择。
最后,如果你的处理过程符合“从Kafka那里读取,进行计算,向Kafka写入”的模型,你可能会从Kafka流中得到比Storm更好的语义,因为Storm不能提供Kafka在这种情况下所能提供的恰好一次的语义。

相关问题