我需要在spark中执行一些文本文件的批处理。基本上有人给了我成吨的csv文件是畸形的。它们包含多行任意文本格式的标题数据,然后是多行格式正确的csv数据。我需要把这些数据分成两个文件,或者至少去掉头文件。
不管怎样,我读到你可以得到一个rdd格式:
[(文件名,内容)]
通过使用
spark.sparkcontext.wholetextfiles(输入文件\u csv)
然后我想在这个rdd上执行一个Map操作,这个操作会产生另一种与原始格式完全相同的格式
[(新文件名,内容)]
然后我希望集群将这些内容保存在这些文件名下。
我找不到可以为我执行此操作的write命令。我可以将rdd原始保存,但不能将其保存为普通文件,以后可以将其读取为Dataframe。
我想我可以删除标题,然后保存为一个单一的巨大csv文件名作为一个新的列,但我觉得这不会是有效的。
有人能解决我的问题吗?
1条答案
按热度按时间k3fezbri1#
这是scala,但在python中应该不会太远。在“foreach”中,我没有使用任何特定于spark的东西来编写文件,只是使用常规的hadoop api。