import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
dataframe = pd.read_csv('content.csv')
output = "/Users/myTable.parquet"
# Create a parquet table from your dataframe
table = pa.Table.from_pandas(dataframe)
# Write direct to your parquet file
pq.write_to_dataset(table , root_path=output)
os.makedirs(path, exist_ok=True)
# write append (replace the naming logic with what works for you)
filename = f'{datetime.datetime.utcnow().timestamp()}.parquet'
df.to_parquet(os.path.join(path, filename))
# read
pd.read_parquet(path)
6条答案
按热度按时间dy2hfwbg1#
要附加,请执行以下操作:
这将自动追加到表中。
xyhw6mcr2#
我用了aws wrangler库。它工作起来很有魅力
以下是参考文档
https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html
我已经从kinesis流读取并使用kinesis-python库来消费消息并写入s3。json的处理逻辑我没有包括在内,因为这篇文章处理的是无法将数据附加到s3的问题。
下面是我使用的示例代码:
任何澄清都是有帮助的。在几篇文章中,我读过数据并再次覆盖。但随着数据变得越来越大,这会减慢过程。这是低效的
qacovj5a3#
在
pandas.to_parquet()
中没有附加模式。你可以做的是读取现有的文件,修改它,然后写回覆盖它。mkshixfv4#
看 起来 可以 使用 fastparquet 将 行 组 追加 到 已经 存在 的 parquet 文件 中 。 这 是 一 个 相当 独特 的 特性 , 因为 大多 数 库 都 没有 这个 实现 。
下 图 来自 pandas doc :
中 的 每 一 个
我们 必须 把 发动 机 和 燃料 箱 都 交 上去 。
格式
如果 append 设置 为 True 并且 文件 不 存在 , 则 会 出现 以下 错误
格式
运行 上述 脚本 3 次 , 我 在 parquet 文件 中 有 以下 数据 。
如果 我 检查 元 数据 , 我 可以 看到 这 产生 了 3 个 行 组 。
如果 您 写入 太 多 的 小 数据 列 群组 , 附加 的 效率 可能 会 很 低 。 通常 建议 的 数据 列 群组 大小 接近 100,000 或 1,000,000 个 数据 列 。 这 比 非常 小 的 数据 列 群组 有 一些 优点 。 压缩 的 效果 会 更 好 , 因为 压缩 只 会 在 数据 列 群组 内 运作 。 而且 , 因为 每个 数据 列 群组 都会 储存 自己 的 统计 数据 , 所以 储存 统计 数据 的 负担 也 会 更 少 。
91zkwejq5#
使用 fastparquet 写入 功能
中 的 每 一 个
据 我 所 知 , 该 文件 肯定 已经 存在 。
API is available here (for now at least): https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
omhiaaxx6#
Pandas
to_parquet()
既可以处理单个文件,也可以处理包含多个文件的目录。如果文件已经存在,Pandas会自动覆盖该文件。要附加到一个 parquet 对象,只需在同一个 parquet 目录中添加一个新文件。