我已经建立了我的第一个玩具flink,我想做一件非常简单的事情:连续读取本地文件并打印内容。
问题是,每次我更新本地文件,flink打印所有行时,我希望它只打印新添加的行。
代码段:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String path = "/home/foobar/input";
TextInputFormat inputFormat = new TextInputFormat(new Path(path));
inputFormat.setCharsetName("UTF-8");
DataStreamSource<String> ds = env.readFile(inputFormat, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 60000l, BasicTypeInfo.STRING_TYPE_INFO);
ds.print();
env.execute("jobname02");
有人知道我做错了什么吗?谢谢你的帮助。
1条答案
按热度按时间6tdlim6h1#
您没有做错任何事情,这是流程模式的记录行为:
如果watchtype连续设置为fileprocessingmode.process\u,则在修改文件时,将完全重新处理其内容。这可能会打破“恰好一次”的语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
当应用于目录时,这种模式更有用,一旦文件被完全写入目录,您就可以自动地将其移动到目录中。