如何在使用spark消费时在zookeeper中保存偏移量id?

83qze16e  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(199)

我正在使用spark来使用kafka的数据。我正在用java编写程序。我希望能够在zookeeper中保存偏移id,并能够从上次使用的偏移id重新启动。如何在zookeeper中保存偏移id?

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
  KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题