我使用spark作为批处理来自kafka的日志。在每一个循环中,我的代码都应该得到Kafka消费者想要的东西。但是,我想对每个周期从Kafka获得的数据量进行限制。比如说5 gb或500000条日志线。。
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
WRITE OFFSETS TO DISK
return rdd
while True:
host = "localhost:9092"
offset = OffsetRange(topic, 0, fromOffset, untilOffset)
kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
kafka_content.transform(storeOffsetRanges)
RDD TRANSFORMATIONS..
我将存储在内存和磁盘的偏移量,以防驱动程序失败。但是,我如何强制这些Kafka偏移来限制每个周期的最大数据量呢?Kafka偏移范围的单位是多少??
提前谢谢!
1条答案
按热度按时间kpbpu0081#
Kafka偏移单位是消息。在每一个周期,你最多会得到
untilOffest - fromOffset
Kafka的留言。但是数据将只从一个主题分区读取,所以如果您的主题有更多分区,那么应用程序将丢失一些日志行。作为另一种选择,你可以尝试Spark流与Kafka直接方法。用这种方法你就能摆脱
while True
,您将基于时间(而不是固定偏移量)使用带有可选背压机制的微批次中的日志行。然后,您可以省略在内存中保存偏移量(流将处理它),但在驱动程序重新启动时,仍然需要将它们保存到磁盘(请参阅)fromOffsets
在kafkautils.createdirectstream中)。