我正在使用spark来使用Kafka的数据。在消耗一些数据后重新启动spark时,如何确保spark将从它停止的偏移开始消耗?
例如,如果在第一次运行时,spark一直消耗到offset id x
. 如何确保下次运行时它将以偏移量id开始 x+1
?
SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
1条答案
按热度按时间gz5pxeao1#
在基于接收器的方法中(通过
KafkaUtils.createStream
),偏移量由wal(write-ahead log)保存和处理,wal负责从正确的偏移量恢复应用程序。如果您希望能够准确地控制从何处恢复应用程序,请查看KafkaUtils.createDStream
通常情况下,直接(无接收器)的api流方法采用重载fromOffsets: Map[TopicAndPartition, Long]
.