我必须从多个数据文件中获取数据,并在一个sink文件中创建一条记录,基本上是静态的 java.util.List
.
如果我指定 env.execute
在每个数据集之后,java对象在每个文件处理之后被刷新
如果我指定 env.execute
最后,文件处理并行进行,我最终得到错误的数据处理。
有什么建议吗?
try {
scanDirectory();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
for(int i =0;i<fileArray.length;i++) {
DataStream<String> text = env.readTextFile(fileArray[i].getPath());
System.out.println(fileArray[i].getPath());
if(fileArray[i].getName().contains("post")) {
DataStream<String> DataStream = text.flatMap(new WirlsChargesPostBillMapper());
// DataStream.writeAsText("C:\\Users\\workspace\\POC\\resources\\results.txt");
// env.execute("First FLink POC");
}
if(fileArray[i].getName().contains("pre")) {
DataStream<String> DataStream2 = text.flatMap(new WirlsChargesPreBillMapper());
// DataStream2.writeAsText("C:\\Users\\workspace\\POC\\resources\\results1.txt");
// env.execute("First FLink POC");
}
// DataStream<String> keysStream = DataStream.forward();
// keysStream.writeUsingOutputFormat(new JedisOutputFormat(null));
// logger.info("final class"+LineDumpList.get(0).getBanNumber());
env.execute("First FLink POC");
}`
暂无答案!
目前还没有任何答案,快来回答吧!