hadoop文件系统打开文件并跳过第一行

uajslkp6  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(343)

我正在用python语言读取hdfs中的文件。
每个文件都有一个头,我正在尝试合并这些文件。但是,每个文件中的头也会合并。
有没有办法跳过第二个文件的头文件?

hadoop = sc._jvm.org.apache.hadoop
conf = hadoop.conf.Configuration()
fs = hadoop.fs.FileSystem.get(conf)

src_dir = "/mnt/test/"
out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite)

files = []
for f in fs.listStatus(hadoop.fs.Path(src_dir)):
  if f.isFile():
    files.append(f.getPath())

for file in files:
  in_stream = fs.open(file)
  hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False)

目前我已经用下面的逻辑解决了这个问题,不过想知道有没有更好更有效的解决方案?谢谢你的帮助

for idx,file in enumerate(files):
            if debug: 
                print("Appending file {} into {}".format(file, dst_file))

            # remove header from the second file
            if idx>0:
              file_str = ""
              with open('/'+str(file).replace(':',''),'r+') as f:
                for idx,line in enumerate(f):
                  if idx>0:
                    file_str = file_str + line

              with open('/'+str(file).replace(':',''), "w+") as f:
                f.write(file_str)
            in_stream = fs.open(file)   # InputStream object and copy the stream
            try:
                hadoop.io.IOUtils.copyBytes(in_stream, out_stream, conf, False)     # False means don't close out_stream
            finally:
                in_stream.close()
6yt4nkrj

6yt4nkrj1#

您现在所做的是重复附加到字符串。这是一个相当缓慢的过程。为什么不在读取时直接写入输出文件?

for file_idx, file in enumerate(files):
  with open(...) as out_f, open(...) as in_f:
    for line_num, line in enumerate(in_f):
      if file_idx == 0 or line_num > 0:
        f_out.write(line)

如果可以一次加载所有文件,还可以使用跳过第一行 readline 然后 readlines :

for file_idx, file in enumerate(files):
  with open(...) as out_f, open(...) as in_f:
    if file_idx != 0:
      f_in.readline()
    f_out.writelines(f_in.readlines())

相关问题