apache spark(v2.1)流媒体java

wbgh16ku  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(240)

下面的代码使用源kafka,sink hdfs,使用spark流。有没有办法把commitasync移到最后?

SparkSession spark = new SparkSession.Builder().appName(consumer_group).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.milliseconds(batchDurationLong));

stream =  KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, Object>Subscribe(topics, kafkaParams));

stream.transform(rddT  ->{
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rddT.rdd()).offsetRanges();
   ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
    return rddT;
     }).map(
            rdd -> {
              return rdd.value().toString();
         }).foreachRDD(invrdd ->
                {
                   if (!invrdd.isEmpty()) {
                     SparkSession sparkWorker = JavaSparkSessionSingleton.getInstance(invrdd.context().getConf());
                     Dataset<Row> invDataset = sparkWorker.read().option("mode", "PERMISSIVE").json(invrdd);
                     invDataset.write().mode(SaveMode.Append).parquet(storageDir + dateFormat.format(new Date()));
}
});

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题