checkpoint与java中的spark文件流

ldioqlga  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(266)

我想用spark文件流应用程序实现checkpoint,以便在spark流应用程序停止/终止时处理hadoop中所有未处理的文件。我遵循这个:流编程指南,但没有找到javastreamingcontextfactory。请帮帮我该怎么办。
我的密码是

public class StartAppWithCheckPoint {

    public static void main(String[] args) {

        try {

            String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/"; 
            String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
            SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

            JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
                  @Override public JavaStreamingContext create() {

                    SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
                    JavaSparkContext sc = new JavaSparkContext(sparkConf);  
                    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
                    JavaDStream<String> lines = jssc.textFileStream(filePath).cache();

                    jssc.checkpoint(checkpointDirectory);
                    return jssc;
                  }
                };

            JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

            context.start();
            context.awaitTermination();
            context.close();
            sparkSession.close();

        } catch(Exception e) {
            e.printStackTrace();
        }   
    }
}
68de4m5k

68de4m5k1#

必须使用检查点
对于检查点,也可以使用有状态转换 updateStateByKey 或者 reduceByKeyAndWindow . spark示例中提供了大量示例,以及github中的预构建spark和spark源。有关您的具体信息,请参阅javastatefulnetworkwordcount.java;

相关问题