python—将数据增量写入parquet文件

zengzsys  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(1726)

要将Dataframe写入Parquet,我将执行以下操作:

df = pd.DataFrame(DATA)
table = pa.Table.from_pandas(df)
pq.write_table(table, 'DATA.parquet')

但是,如果我有1b行的话,这就不太好用了,而且它不能放在内存中。在这种情况下,我将如何增量地写入数据。例如,类似于:

DATA = []
BACTCH_SIZE = 10000
with open('largefile.csv') as f:
    for num, line in enumerate(f):
        if (len(DATA) == BATCH_SIZE):
            pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.parquet')
            DATA = []
        DATA.append(line.split(','))

if DATA: pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.parquet')

但是,我相信上面的内容会一直覆盖Parquet文件。我怎么能做相当于附加?

fcg9iug3

fcg9iug31#

hadoop并不是用来附加的。只需将每个批的新文件写入一个目录,几乎所有hadoopapi都应该能够读取所有的parquet文件

BACTCH_SIZE = 10000
c = 0
with open('largefile.csv') as f:
    for num, line in enumerate(f):
        if len(DATA) == BATCH_SIZE:
            pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.{}.parquet'.format(c))
            DATA = []
            c += 1
        DATA.append(line.split(','))

spark也是这样写数据的;每个执行者一个文件
但如果你有一个大的csv,只要把它放在hdfs中,然后在上面创建一个配置单元表,然后把它转换成Parquet地板。根本不需要Pandas

相关问题