我用spark流来读Kafka的留言,效果不错。但我有一个要求,那就是重新阅读信息。我想我可能只需要更改spark的客户组ID并重新启动spark流媒体应用程序,它应该从头开始重新阅读kafka消息。但结果是spark没有收到任何信息,我很困惑。根据kafka文档,如果您更改了customer groupid,那么它应该从一开始就收到消息,因为kafka将您视为新客户。提前谢谢!
我用spark流来读Kafka的留言,效果不错。但我有一个要求,那就是重新阅读信息。我想我可能只需要更改spark的客户组ID并重新启动spark流媒体应用程序,它应该从头开始重新阅读kafka消息。但结果是spark没有收到任何信息,我很困惑。根据kafka文档,如果您更改了customer groupid,那么它应该从一开始就收到消息,因为kafka将您视为新客户。提前谢谢!
2条答案
按热度按时间zi8p0yeb1#
听起来你在使用spark streaming的基于接收器的api for kafka。对于该api,auto.offset.reset仅适用于zk中没有偏移的情况,正如您所注意到的。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2个直接接近无接收机
如果希望能够指定精确的偏移量,请参阅以fromOffset为参数的createdirectstream调用的版本。
9jyewag02#
kafka使用者有一个名为auto.offset.reset的属性(请参见kafka文档)。当消费者开始消费但尚未提交补偿时,它会告诉消费者该怎么做。这是你的案子。主题有消息,但没有存储起始偏移量,因为您尚未读取该新组id下的任何内容。在这种情况下,将使用auto.offset.reset属性。如果该值为“最大”,并且这是默认值),那么开始位置将设置为最大偏移(最后一个),并且您将获得所看到的行为。如果该值是“最小”的,那么偏移量被设置为起始偏移量,使用者将读取整个分区。这就是你想要的。
所以我不确定你是如何在spark应用程序中设置kafka属性的,但是如果你想让新的组id读取整个主题的话,你肯定希望该属性设置为“最小”。