我通过以下命令运行应用程序./spark submit--master yarn--deploy mode cluster--packages org.mongodb。spark:mongo-spark-connector_2.11:2.2.0--类com.mmi.process.app.startapp./sparkfilestreaming.jar
但在处理hdfs文件后它会自动停止。请告诉我为什么会这样。
我的代码在下面
SparkConf sparkConf = new SparkConf().setAppName("MyWordCount");
sparkConf.set("spark.streaming.receiver.whiteAheadLog.enable", "true");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(60));
streamingContext.checkpoint("hdfs://Mongo1:9000/probeAnalysis/checkpoint");
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
JavaDStream<String> dStream = streamingContext.textFileStream(filePath).cache();
dStream.foreachRDD((rdd) -> {
try {
Long startTime = System.currentTimeMillis();
long total = rdd.count();
if(total > 0) {
String filePath1 = Utils.getFilePath(rdd);
SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();
Dataset<Row> jsonDataset = sparkSession.read().json(path.toString()).cache();
jsonDataset.createOrReplaceTempView("mmi_probes");
Dataset<Row> probAnalysisData = sparkSession.sql("select sourceId, areaCode, vehicleType, "
+ "count(distinct deviceId) as totalDevices, "
+ "sum(valid+duplicate+history+future) as total, sum(valid) as valid, "
+ "sum(duplicate) as duplicate, sum(history) as history, sum(future) as future, "
+ "sum(count_0_2) as 0_2, sum(count_2_5) as 2_5, sum(count_5_10) as 5_10, "
+ "sum(count_10_15) as 10_15"
+ " from mmi_probes group by sourceId, areaCode, vehicleType");
MongoSpark.write(probAnalysisData)
.option("spark.mongodb.output.uri", "mongodb://Mongo-server:27017/mmi_traffic.mmi_test_all")
.mode("append")
.save();
sparkSession.close();
}
} catch(Exception e) {
e.printStackTrace();
}
});
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();
暂无答案!
目前还没有任何答案,快来回答吧!