下面的代码使用源kafka,sink hdfs,使用spark流。有没有办法把commitasync移到最后?
SparkSession spark = new SparkSession.Builder().appName(consumer_group).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.milliseconds(batchDurationLong));
stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Object>Subscribe(topics, kafkaParams));
stream.transform(rddT ->{
OffsetRange[] offsetRanges = ((HasOffsetRanges) rddT.rdd()).offsetRanges();
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
return rddT;
}).map(
rdd -> {
return rdd.value().toString();
}).foreachRDD(invrdd ->
{
if (!invrdd.isEmpty()) {
SparkSession sparkWorker = JavaSparkSessionSingleton.getInstance(invrdd.context().getConf());
Dataset<Row> invDataset = sparkWorker.read().option("mode", "PERMISSIVE").json(invrdd);
invDataset.write().mode(SaveMode.Append).parquet(storageDir + dateFormat.format(new Date()));
}
});
暂无答案!
目前还没有任何答案,快来回答吧!