使用spark rdd保存和加载wholetextfiles

2lpgd968  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(443)

我需要在spark中执行一些文本文件的批处理。基本上有人给了我成吨的csv文件是畸形的。它们包含多行任意文本格式的标题数据,然后是多行格式正确的csv数据。我需要把这些数据分成两个文件,或者至少去掉头文件。
不管怎样,我读到你可以得到一个rdd格式:
[(文件名,内容)]
通过使用
spark.sparkcontext.wholetextfiles(输入文件\u csv)
然后我想在这个rdd上执行一个Map操作,这个操作会产生另一种与原始格式完全相同的格式
[(新文件名,内容)]
然后我希望集群将这些内容保存在这些文件名下。
我找不到可以为我执行此操作的write命令。我可以将rdd原始保存,但不能将其保存为普通文件,以后可以将其读取为Dataframe。
我想我可以删除标题,然后保存为一个单一的巨大csv文件名作为一个新的列,但我觉得这不会是有效的。
有人能解决我的问题吗?

k3fezbri

k3fezbri1#

这是scala,但在python中应该不会太远。在“foreach”中,我没有使用任何特定于spark的东西来编写文件,只是使用常规的hadoop api。

sc.wholeTextFiles("/tmp/test-data/")
  .foreach{ x =>
    val filename = x._1
    val content = x._2
    val fs = FileSystem.get(new Configuration())
    val output = fs.create(new Path(s"${filename}-copy"))
    val writer = new PrintWriter(output)
    writer.write(content)
    writer.close
  }

相关问题