根据文档,可以从(scala)spark流应用程序将offset提交到kafka中。我想从pyspark实现同样的功能。
或者至少将kafka分区、偏移量存储到外部数据存储(rdbms等)中。
但是,用于kafka集成的pyspark api只提供 RDD(offset, value)]
而不是 RDD[ConsumerRecord]
(如在斯卡拉)。有没有办法得到 (topic, partition, offset)
从python rdd?还有其他地方?
根据文档,可以从(scala)spark流应用程序将offset提交到kafka中。我想从pyspark实现同样的功能。
或者至少将kafka分区、偏移量存储到外部数据存储(rdbms等)中。
但是,用于kafka集成的pyspark api只提供 RDD(offset, value)]
而不是 RDD[ConsumerRecord]
(如在斯卡拉)。有没有办法得到 (topic, partition, offset)
从python rdd?还有其他地方?
1条答案
按热度按时间ajsxfq5m1#
我们可以用多种方式处理抵消。其中一种方法是在每次成功处理数据时将偏移值存储在zookeeper path中,并在再次创建流时读取该值。代码片段如下。
当做
karthikeyan rasipalayam durairaj先生