pyspark kafka偏移范围单位

mxg2im7a  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(282)

我使用spark作为批处理来自kafka的日志。在每一个循环中,我的代码都应该得到Kafka消费者想要的东西。但是,我想对每个周期从Kafka获得的数据量进行限制。比如说5 gb或500000条日志线。。

offsetRanges = []
def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    WRITE OFFSETS TO DISK
    return rdd

while True:
    host = "localhost:9092"
    offset = OffsetRange(topic, 0, fromOffset, untilOffset)
    kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
    kafka_content.transform(storeOffsetRanges)
    RDD TRANSFORMATIONS..

我将存储在内存和磁盘的偏移量,以防驱动程序失败。但是,我如何强制这些Kafka偏移来限制每个周期的最大数据量呢?Kafka偏移范围的单位是多少??
提前谢谢!

kpbpu008

kpbpu0081#

Kafka偏移单位是消息。在每一个周期,你最多会得到 untilOffest - fromOffset Kafka的留言。但是数据将只从一个主题分区读取,所以如果您的主题有更多分区,那么应用程序将丢失一些日志行。
作为另一种选择,你可以尝试Spark流与Kafka直接方法。用这种方法你就能摆脱 while True ,您将基于时间(而不是固定偏移量)使用带有可选背压机制的微批次中的日志行。然后,您可以省略在内存中保存偏移量(流将处理它),但在驱动程序重新启动时,仍然需要将它们保存到磁盘(请参阅) fromOffsets 在kafkautils.createdirectstream中)。

相关问题