对于kafka流,如果我们使用较低级别的处理器api,我们可以控制提交与否。因此,如果代码中出现问题,我们不想提交此消息。在这种情况下,Kafka将多次重新传递此消息,直到问题得到解决。
但是在使用更高级的流dslapi时,如何控制是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
对于kafka流,如果我们使用较低级别的处理器api,我们可以控制提交与否。因此,如果代码中出现问题,我们不想提交此消息。在这种情况下,Kafka将多次重新传递此消息,直到问题得到解决。
但是在使用更高级的流dslapi时,如何控制是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
1条答案
按热度按时间f3temu5u1#
你的说法不完全正确。您不能“控制提交或不提交”--至少不能直接提交(既不能在处理器api中,也不能在dsl中)。你只能用
ProcessorContext#commit()
请求额外的提交。因此,在打电话给#commit()
streams尝试尽快提交,但这不是立即提交。此外,即使您从未调用,流也会自动提交#commit()
. 您可以通过流配置控制流提交间隔commit.interval.m
(参见。http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-应用程序)如果出现“问题”,则取决于您遇到的问题类型以及如何应对:
如果检测到无法恢复的问题,则只能抛出异常并“停止世界”(参见下文)。
如果您有一个可恢复的错误,您需要在自己的代码中“循环”(例如,在
Processor#process()
或者KeyValueMapper#apply()
直到问题得到解决,并且您可以成功地处理当前消息(注意,使用此策略,您可能会遇到超时,即异常——请参阅consumer configs)heartbeat.interval.ms
对于0.10.1session.timeout.ms
[kip-62])另一种方法是,将当前无法处理的记录放入
StateStore
以后再处理。然而,它很难得到正确的结果,而且也打破了一些假设(例如,处理顺序)。它不建议使用,如果使用,你必须非常小心的含义如果有意外的异常,
StreamThread
将终止并且不会发生提交(您可以注册一个异常处理程序来获得关于此的通知:http://docs.confluent.io/current/streams/developer-guide.html#using-应用程序代码中的kafka流。如果你StreamThread
如果已死亡,则需要创建的新示例KafkaStreams
重新启动应用程序。在成功处理消息之前,不能从用户代码返回,因为如果返回,streams会假定消息已成功处理(因此可能会提交相应的偏移量)。关于要点(3),将一条记录放入一个特殊的statestore以便以后处理被认为是一条“成功”处理的记录。