使用kafka streams dsl时如何处理错误和不提交

mwkjh3gx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(285)

对于kafka流,如果我们使用较低级别的处理器api,我们可以控制提交与否。因此,如果代码中出现问题,我们不想提交此消息。在这种情况下,Kafka将多次重新传递此消息,直到问题得到解决。
但是在使用更高级的流dslapi时,如何控制是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

f3temu5u

f3temu5u1#

你的说法不完全正确。您不能“控制提交或不提交”--至少不能直接提交(既不能在处理器api中,也不能在dsl中)。你只能用 ProcessorContext#commit() 请求额外的提交。因此,在打电话给 #commit() streams尝试尽快提交,但这不是立即提交。此外,即使您从未调用,流也会自动提交 #commit() . 您可以通过流配置控制流提交间隔 commit.interval.m (参见。http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-应用程序)
如果出现“问题”,则取决于您遇到的问题类型以及如何应对:
如果检测到无法恢复的问题,则只能抛出异常并“停止世界”(参见下文)。
如果您有一个可恢复的错误,您需要在自己的代码中“循环”(例如,在 Processor#process() 或者 KeyValueMapper#apply() 直到问题得到解决,并且您可以成功地处理当前消息(注意,使用此策略,您可能会遇到超时,即异常——请参阅consumer configs) heartbeat.interval.ms 对于0.10.1 session.timeout.ms [kip-62])
另一种方法是,将当前无法处理的记录放入 StateStore 以后再处理。然而,它很难得到正确的结果,而且也打破了一些假设(例如,处理顺序)。它不建议使用,如果使用,你必须非常小心的含义
如果有意外的异常, StreamThread 将终止并且不会发生提交(您可以注册一个异常处理程序来获得关于此的通知:http://docs.confluent.io/current/streams/developer-guide.html#using-应用程序代码中的kafka流。如果你 StreamThread 如果已死亡,则需要创建的新示例 KafkaStreams 重新启动应用程序。
在成功处理消息之前,不能从用户代码返回,因为如果返回,streams会假定消息已成功处理(因此可能会提交相应的偏移量)。关于要点(3),将一条记录放入一个特殊的statestore以便以后处理被认为是一条“成功”处理的记录。

相关问题