我想用apache flink批处理两个文件,一个接一个。
举个具体的例子:假设我想给每一行分配一个索引,这样第二个文件中的行就跟在第一个文件后面。以下代码将两个文件中的行交错,而不是这样做:
val env = ExecutionEnvironment.getExecutionEnvironment
val text1 = env.readTextFile("/path/to/file1")
val text2 = env.readTextFile("/path/to/file2")
val union = text1.union(text2).flatMap { ... }
我想确认一下 text1
是通过 flatMap
先是接线员,然后是所有人 text2
. 建议的方法是什么?
事先谢谢你的帮助。
1条答案
按热度按时间busg9geu1#
DataSet.union()
不跨输入提供任何订单保证。来自同一输入分区的记录将保持有序,但将与来自其他输入分区的记录合并。但还有一个更根本的问题。flink是一个并行数据处理器。并行处理数据时,无法保留全局顺序。例如,当flink并行读取文件时,它会尝试拆分这些文件并独立处理每个拆分。分拆没有特别的顺序。因此,单个文件的记录已经被洗牌。您需要将整个作业的并行度设置为1,并实现一个自定义
InputFormat
让这一切顺利。你可以让它工作,但它不会并行,你需要调整许多事情。我不认为Flink是完成这种任务的最佳工具。您是否考虑过使用简单的unix命令行工具来连接您的文件?