python 如何在pyspark中搜索和删除 parquet 文件中的特定行?(数据清除)

9rnv2umw  于 2023-02-28  发布在  Python
关注(0)|答案(2)|浏览(205)

我正在启动一个项目,调整数据湖的具体清除数据,以遵守数据隐私立法。
基本上,数据的所有者打开一个请求删除特定用户记录的呼叫,我需要通过检查所有 parquet 文件来清理所有AWS S3 bucktes,并从我的数据湖中的所有 parquet 文件中删除此特定记录。
有人用python或pyspark开发过类似的项目吗?
您能建议一下针对此案例的良好市场惯例吗?
今天我要做的是阅读所有的parquet文件,将其放入 Dataframe ,过滤 Dataframe ,排除当前记录,并重写记录所在的分区。这个解决方案甚至可以工作,但要清除需要查看5年历史的分区,处理工作非常繁重。
谁能给我一个更实际的解决办法?
记住我的 parquet 文件在AWS S3中,有Athena表,我的脚本将在EMR(Pyspark)中运行
谢谢

6jjcrrmo

6jjcrrmo1#

parquet 的设计方式这是唯一的方法来做你正在尝试做的事情。唯一的优化你可以做的是通过分区表只需要重写个别分区与分区下推。
如果您愿意更改文件格式,则ACID兼容表格格式(如Delta Lake)支持就地删除,只需重写单个文件。
How to Delete Rows from a Delta Lake Table

vwhgwdsa

vwhgwdsa2#

尝试使用Athena冰山表,因为Iceberg tables是ACID兼容的。我相信这对于Athena和删除/更新S3中的特定行是很好的实践。

    • 一月一日**

Step1:创建冰山表。

CREATE TABLE iceberg_table (
  id int,
  data string,
  category string) 
PARTITIONED BY (category, bucket(16,id)) 
LOCATION 's3://DOC-EXAMPLE-BUCKET/iceberg-folder' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912'
);
    • 第二步:**

1.将现有表中的数据加载到Iceberg表中。
1.然后尝试您的要求在这里和转换您所有的 parquet 表冰山表。

  1. Optimize表以获得最佳性能。

相关问题