pyspark hdfs数据流读写

dkqlctbz  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(651)

我有一个hdfs目录和几个文件,我想合并成一个。我不想用spark-dfs来实现这一点,而是使用数据流来实现hdfs交互。以下是我目前的代码:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    buffer = bytes(256)
    in_stream = hdfs.open(f.getPath())  # FSDataInputStream

    bytesRead = in_stream.read(buffer)
    while (bytesRead > 0):
        out_stream.writeBytes(bytesRead)
        out_stream.flush()
    in_stream.close()

out_stream.close()

这段代码的第一个问题是,我不确定如何通过缓冲区从输入流读取数据。第一个问题是,输出文件是在hdfs中创建的,但没有写入任何内容(即使我向其写入固定值)。

iyfjxgzm

iyfjxgzm1#

经过调查,我找到了解决问题的办法。解决方案包括,通过spark上下文创建一些jvm对象,并将它们用于缓冲i/o操作:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    raw_in = hdfs.open(f.getPath())  # FSDataInputStream
    in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)

    while in_stream.available() > 0:
        out_stream.write(in_stream.read()) 
        out_stream.flush()
    in_stream.close()
out_stream.close()

相关问题