in-flink一个接一个地处理文件,并在每次文件处理后更新一个公共java对象

e4eetjau  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(182)

我必须从多个数据文件中获取数据,并在一个sink文件中创建一条记录,基本上是静态的 java.util.List .
如果我指定 env.execute 在每个数据集之后,java对象在每个文件处理之后被刷新
如果我指定 env.execute 最后,文件处理并行进行,我最终得到错误的数据处理。
有什么建议吗?

try {
        scanDirectory();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        for(int i =0;i<fileArray.length;i++) {
            DataStream<String> text = env.readTextFile(fileArray[i].getPath());
            System.out.println(fileArray[i].getPath());
            if(fileArray[i].getName().contains("post")) {
                DataStream<String> DataStream = text.flatMap(new WirlsChargesPostBillMapper());
            //  DataStream.writeAsText("C:\\Users\\workspace\\POC\\resources\\results.txt");
            //  env.execute("First FLink POC");
            }

            if(fileArray[i].getName().contains("pre")) {
                DataStream<String> DataStream2 = text.flatMap(new WirlsChargesPreBillMapper());
            //  DataStream2.writeAsText("C:\\Users\\workspace\\POC\\resources\\results1.txt");
            //  env.execute("First FLink POC");
            }
            //  DataStream<String> keysStream = DataStream.forward();
        //  keysStream.writeUsingOutputFormat(new JedisOutputFormat(null));
        //  logger.info("final class"+LineDumpList.get(0).getBanNumber());
            env.execute("First FLink POC");
        }`

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题