我有一个很大的CSV文件(大约70 GB),我需要在Databricks中使用PySpark读取它,以创建一个Pyspark Dataframe。文件保存在装载到Databricks的存储帐户中。现在,我不能这样做,因为它给了我一个错误:
作为替代方案,我考虑将文件拆分为多个CSV。有没有一种方法可以做到这么简单?
p8ekf7hl1#
我想说用Python拆分一个大的CSV是相当容易的。我还建议大多数使用CSV文件的任务(如99.999%)使用CSV感知工具。像“使用这个POSIX工具”这样的建议,总是会因为标题和跨多行的行(因为引用了换行符)而受到困扰。因此,无论您使用下面的工具,还是像GoCSV's split command这样的工具,都要使用符合CSV规范的工具。但是,如果您100%知道CSV没有标题,也没有多行行,那么您可能可以使用常规的文本处理工具。要在Python中运行自己的拆分器,需要一些机制来创建一个新的文件和csv.writer,因为很多行已经写入了前一个文件/writer。我不知道下面的代码有多Python化,但是:我认为它相当清晰;而且很有效!
import csv import io MAX_ROWS = 10 # "Initialize" f_out and writer as their respective types f_out = io.StringIO("") writer = csv.writer(f_out) out_num = 0 def next_writer(header: list[str]): """Gets the next-numbered CSV writer; closes the previous file and flushes its writer.""" global out_num global f_out global writer f_out.close() out_num += 1 f_out = open(f"output_{out_num:03}.csv", "w", newline="") writer = csv.writer(f_out) writer.writerow(header) with open("input.csv", newline="") as f_in: reader = csv.reader(f_in) header = next(reader) for i, row in enumerate(reader): if i % MAX_ROWS == 0: next_writer(header) writer.writerow(row) f_out.close() # close underlying file; flush writer
“初始化”输出文件和写入器似乎有点笨拙,但是,对于我的示例,这确保了我们拥有全局范围的写入器(next_writer(...)和主读取循环需要)及其底层的编号输出文件(用于next_writer(...))。我生成了一个包含100行的CSV:
H____1,H____2 r001c1,r001c2 r002c1,r002c2 r003c1,r003c2 ......,...... r098c1,r098c2 r099c1,r099c2 r100c1,r100c2
运行上面的代码(MAX_ROWS = 10),得到10个文件,分别是output_001.csv到output_010.csv(格式字符串f"{out_num:03}"用前导零填充数字,最多可以填充三个位置,以允许999个文件)。每个文件看起来像:
f"{out_num:03}"
output_001.csv -------------- H____1,H____2 r001c1,r001c2 r002c1,r002c2 r003c1,r003c2 ......,...... output_010.csv -------------- H____1,H____2 ......,...... r098c1,r098c2 r099c1,r099c2 r100c1,r100c2
1条答案
按热度按时间p8ekf7hl1#
我想说用Python拆分一个大的CSV是相当容易的。
我还建议大多数使用CSV文件的任务(如99.999%)使用CSV感知工具。像“使用这个POSIX工具”这样的建议,总是会因为标题和跨多行的行(因为引用了换行符)而受到困扰。因此,无论您使用下面的工具,还是像GoCSV's split command这样的工具,都要使用符合CSV规范的工具。但是,如果您100%知道CSV没有标题,也没有多行行,那么您可能可以使用常规的文本处理工具。
要在Python中运行自己的拆分器,需要一些机制来创建一个新的文件和csv.writer,因为很多行已经写入了前一个文件/writer。
我不知道下面的代码有多Python化,但是:我认为它相当清晰;而且很有效!
“初始化”输出文件和写入器似乎有点笨拙,但是,对于我的示例,这确保了我们拥有全局范围的写入器(next_writer(...)和主读取循环需要)及其底层的编号输出文件(用于next_writer(...))。
我生成了一个包含100行的CSV:
运行上面的代码(MAX_ROWS = 10),得到10个文件,分别是output_001.csv到output_010.csv(格式字符串
f"{out_num:03}"
用前导零填充数字,最多可以填充三个位置,以允许999个文件)。每个文件看起来像: