apache panda将 Dataframe 写入parquet格式(带附加)

qv7cva1a  于 2022-11-16  发布在  Apache
关注(0)|答案(6)|浏览(215)

我尝试在append模式下将pandas dataframe写入parquet文件格式(在最新的panda版本0.21.0中引入)。然而,文件没有被追加到现有文件,而是被新数据覆盖。我错过了什么?
写入语法为

df.to_parquet(path, mode='append')

读取语法为

pd.read_parquet(path)
dy2hfwbg

dy2hfwbg1#

要附加,请执行以下操作:

import pandas as pd 
import pyarrow.parquet as pq
import pyarrow as pa

dataframe = pd.read_csv('content.csv')
output = "/Users/myTable.parquet"

# Create a parquet table from your dataframe
table = pa.Table.from_pandas(dataframe)

# Write direct to your parquet file
pq.write_to_dataset(table , root_path=output)

这将自动追加到表中。

xyhw6mcr

xyhw6mcr2#

我用了aws wrangler库。它工作起来很有魅力

以下是参考文档

https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html
我已经从kinesis流读取并使用kinesis-python库来消费消息并写入s3。json的处理逻辑我没有包括在内,因为这篇文章处理的是无法将数据附加到s3的问题。

下面是我使用的示例代码:

!pip install awswrangler
import awswrangler as wr
import pandas as pd
evet_data=pd.DataFrame({'a': [a], 'b':[b],'c':[c],'d':[d],'e': [e],'f':[f],'g': [g]},columns=['a','b','c','d','e','f','g'])
#print(evet_data)
s3_path="s3://<your bucker>/table/temp/<your folder name>/e="+e+"/f="+str(f)
try:
    wr.s3.to_parquet(
    df=evet_data,
    path=s3_path,
    dataset=True,
    partition_cols=['e','f'],
    mode="append",
    database="wat_q4_stg",
    table="raw_data_v3",
    catalog_versioning=True  # Optional
    )
    print("write successful")       
except Exception as e:
    print(str(e))

任何澄清都是有帮助的。在几篇文章中,我读过数据并再次覆盖。但随着数据变得越来越大,这会减慢过程。这是低效的

qacovj5a

qacovj5a3#

pandas.to_parquet()中没有附加模式。你可以做的是读取现有的文件,修改它,然后写回覆盖它。

mkshixfv

mkshixfv4#

看 起来 可以 使用 fastparquet 将 行 组 追加 到 已经 存在 的 parquet 文件 中 。 这 是 一 个 相当 独特 的 特性 , 因为 大多 数 库 都 没有 这个 实现 。
下 图 来自 pandas doc

DataFrame.to_parquet(path, engine='auto', compression='snappy', index=None, partition_cols=None, **kwargs)

中 的 每 一 个
我们 必须 把 发动 机 和 燃料 箱 都 交 上去 。

  • 引擎
      • kwargs - 传递 给 镶 木 地板 库 的 附加 参数 。
    • 夸 格 - - 这里 我们 需要 经过 的 是 :* * append = True * * ( 来自 快速 镶 木 地板 )
import pandas as pd
import os.path

file_path = "D:\\dev\\output.parquet"
df = pd.DataFrame(data={'col1': [1, 2,], 'col2': [3, 4]})
if not os.path.isfile(file_path):
  df.to_parquet(file_path, engine='fastparquet')
else:
  df.to_parquet(file_path, engine='fastparquet', append=True)

格式
如果 append 设置 为 True 并且 文件 不 存在 , 则 会 出现 以下 错误

AttributeError: 'ParquetFile' object has no attribute 'fmd'

格式
运行 上述 脚本 3 次 , 我 在 parquet 文件 中 有 以下 数据 。

如果 我 检查 元 数据 , 我 可以 看到 这 产生 了 3 个 行 组 。

    • 备注 : * *

如果 您 写入 太 多 的 小 数据 列 群组 , 附加 的 效率 可能 会 很 低 。 通常 建议 的 数据 列 群组 大小 接近 100,000 或 1,000,000 个 数据 列 。 这 比 非常 小 的 数据 列 群组 有 一些 优点 。 压缩 的 效果 会 更 好 , 因为 压缩 只 会 在 数据 列 群组 内 运作 。 而且 , 因为 每个 数据 列 群组 都会 储存 自己 的 统计 数据 , 所以 储存 统计 数据 的 负担 也 会 更 少 。

91zkwejq

91zkwejq5#

使用 fastparquet 写入 功能

from fastparquet import write

write(file_name, df, append=True)

中 的 每 一 个
据 我 所 知 , 该 文件 肯定 已经 存在 。
API is available here (for now at least): https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write

omhiaaxx

omhiaaxx6#

Pandas to_parquet()既可以处理单个文件,也可以处理包含多个文件的目录。如果文件已经存在,Pandas会自动覆盖该文件。要附加到一个 parquet 对象,只需在同一个 parquet 目录中添加一个新文件。

os.makedirs(path, exist_ok=True)

# write append (replace the naming logic with what works for you)
filename = f'{datetime.datetime.utcnow().timestamp()}.parquet'
df.to_parquet(os.path.join(path, filename))

# read
pd.read_parquet(path)

相关问题