我一直在做Spark流的工作,消费者和生产数据通过Kafka。我用的是directdstream,所以我必须自己管理偏移量,我们采用redis来写和读偏移量,现在有一个问题,我启动客户端时,客户端需要从redis得到偏移量,而不是kafka本身的偏移量,我怎么写代码呢?现在我已经写了下面的代码:
kafka_stream = KafkaUtils.createDirectStream(
ssc,
topics=[config.CONSUME_TOPIC, ],
kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
"auto.offset.reset": "largest"},
fromOffsets=read_offset_range(config.OFFSET_KEY))
但是我认为fromOffset是spark流媒体客户端启动时的值(来自redis),而不是在运行时的值。
1条答案
按热度按时间dohp0rv51#
如果我理解正确,您需要手动设置偏移量。我就是这样做的: