我有一个hdfs目录和几个文件,我想合并成一个。我不想用spark-dfs来实现这一点,而是使用数据流来实现hdfs交互。以下是我目前的代码:
...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet')) # FSDataOutputStream
for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):
buffer = bytes(256)
in_stream = hdfs.open(f.getPath()) # FSDataInputStream
bytesRead = in_stream.read(buffer)
while (bytesRead > 0):
out_stream.writeBytes(bytesRead)
out_stream.flush()
in_stream.close()
out_stream.close()
这段代码的第一个问题是,我不确定如何通过缓冲区从输入流读取数据。第一个问题是,输出文件是在hdfs中创建的,但没有写入任何内容(即使我向其写入固定值)。
1条答案
按热度按时间iyfjxgzm1#
经过调查,我找到了解决问题的办法。解决方案包括,通过spark上下文创建一些jvm对象,并将它们用于缓冲i/o操作: