在处理第一个hdfs流文件后停止spark流

u7up0aaq  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(198)

我通过以下命令运行应用程序./spark submit--master yarn--deploy mode cluster--packages org.mongodb。spark:mongo-spark-connector_2.11:2.2.0--类com.mmi.process.app.startapp./sparkfilestreaming.jar
但在处理hdfs文件后它会自动停止。请告诉我为什么会这样。
我的代码在下面

SparkConf sparkConf = new SparkConf().setAppName("MyWordCount");
sparkConf.set("spark.streaming.receiver.whiteAheadLog.enable", "true");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(60));
streamingContext.checkpoint("hdfs://Mongo1:9000/probeAnalysis/checkpoint");
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
JavaDStream<String> dStream = streamingContext.textFileStream(filePath).cache();
dStream.foreachRDD((rdd) -> {

    try {

        Long startTime = System.currentTimeMillis();
        long total = rdd.count();

        if(total > 0) {

            String filePath1 = Utils.getFilePath(rdd);

            SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

            Dataset<Row> jsonDataset = sparkSession.read().json(path.toString()).cache();
            jsonDataset.createOrReplaceTempView("mmi_probes");
            Dataset<Row> probAnalysisData = sparkSession.sql("select sourceId, areaCode, vehicleType, "
                + "count(distinct deviceId) as totalDevices, "
                + "sum(valid+duplicate+history+future) as total, sum(valid) as valid, "
                + "sum(duplicate) as duplicate, sum(history) as history, sum(future) as future, "
                + "sum(count_0_2) as 0_2, sum(count_2_5) as 2_5, sum(count_5_10) as 5_10, "
                + "sum(count_10_15) as 10_15"
                + " from mmi_probes group by sourceId, areaCode, vehicleType");

            MongoSpark.write(probAnalysisData)
            .option("spark.mongodb.output.uri", "mongodb://Mongo-server:27017/mmi_traffic.mmi_test_all")
            .mode("append")
            .save();

            sparkSession.close();

        }

    } catch(Exception e) {
        e.printStackTrace();
    }

});

streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题