因此,我必须检索存储在hdfs中的文件的内容,并对其进行一定的分析。
问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件(我是新来的Flink,这只是一个测试,以确保我正确地阅读文件)
hdfs中的文件是纯文本文件。这是我的密码:
public class readFromHdfs {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> lines = env.readTextFile("hdfs://localhost:9000//test/testfile0.txt");
lines.writeAsText("/tmp/hdfs_file.txt");
env.execute("File read from HDFS");
}
}
在我运行/tmp之后,它没有输出。
这是一个非常简单的代码,我不确定是否有问题,或者我只是做了一些其他的错误。就像我说的,我对Flink是个新手
此外,作业在web Jmeter 板中显示为失败。这是Flink日志的内容:https://pastebin.com/rvkxpghu
提前谢谢
编辑:我通过增加任务槽的数量解决了这个问题。web Jmeter 板显示了一个可用的任务槽,它并没有抱怨没有足够的槽,所以我不认为可能是这样。
不管怎样,writestext并不像我想象的那样有效。我可以从testfile0.txt读取内容,但它不会将它们写入hdfs\u file.txt。相反,它用这个名字创建了一个目录,里面有8个文本文件,其中6个完全是空的。另外两个包含testfile0.txt(大部分在1.txt中,最后一个在2.txt中)。
虽然这并不重要,因为文件的内容正确地存储在数据集中,所以我可以继续分析数据。
1条答案
按热度按时间x9ybnkn61#
它按预期工作-您已将完整作业的并行性(因此也将输出格式)设置为8,因此每个插槽都会创建自己的文件(正如您知道的那样,并发写入单个文件是不安全的)。如果你只需要一个输出文件,你应该
writeAsText(...).setParalellis(1)
重写全局并行性属性。如果您想在本地文件系统而不是hdfs中获取输出,那么应该在path中显式设置“file://”协议,因为对于hadoop,flink在默认情况下是“hdfs://”。