如何在重新启动时使用上一个偏移量id?

vwkv1x7d  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(277)

我正在使用spark来使用Kafka的数据。在消耗一些数据后重新启动spark时,如何确保spark将从它停止的偏移开始消耗?
例如,如果在第一次运行时,spark一直消耗到offset id x . 如何确保下次运行时它将以偏移量id开始 x+1 ?

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
  KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
gz5pxeao

gz5pxeao1#

在基于接收器的方法中(通过 KafkaUtils.createStream ),偏移量由wal(write-ahead log)保存和处理,wal负责从正确的偏移量恢复应用程序。如果您希望能够准确地控制从何处恢复应用程序,请查看 KafkaUtils.createDStream 通常情况下,直接(无接收器)的api流方法采用重载 fromOffsets: Map[TopicAndPartition, Long] .

相关问题