pyspark AWS Glue进程记录逐行插入更新删除

jhkqcmku  于 2023-03-28  发布在  Spark
关注(0)|答案(1)|浏览(123)

我有一个要求,做增量加载到一个表的基础上一些逻辑

  • 如果原始表中存在record_id,则用新记录更新整行
  • 如果record_id不存在,则插入一个新行,并将新的custom_id作为主键
  • 对于某些逻辑需要删除整个记录

在Glue中可以做这些行操作吗?谢谢!

vdzxcuhz

vdzxcuhz1#

感谢Apache Iceberg,经过4年的漫长等待,现在可以实现upsert到数据湖表中。目前,有两种方法可以做到这一点。

使用雅典娜

第一种方法涉及使用Athena,您可以直接从Athena创建Iceberg表,并将其指定为ICEBERG表。创建表后,您可以运行merge into命令,根据需要更新或插入数据。
1.直接从Athena创建一个Iceberg表,并将其指定为ICEBERG表
Amazon Athena - Creating Iceberg tables

CREATE TABLE iceberg_table (id bigint, data string, category string)
PARTITIONED BY (category, bucket(16, id))
LOCATION 's3://YOUR-BUCKET/your-folder/'
TBLPROPERTIES ( 'table_type' = 'ICEBERG' )

1.之后,可以运行merge into命令
Amazon Athena - MERGE INTO

MERGE INTO accounts t USING monthly_accounts_update s
    ON (t.customer = s.customer)
    WHEN MATCHED
        THEN UPDATE SET purchases = s.purchases + t.purchases
    WHEN NOT MATCHED
        THEN INSERT (customer, purchases, address)
              VALUES(s.customer, s.purchases, s.address)

使用胶水

第二种方法是使用Glue,您可以使用Spark SQL查询将数据合并到Iceberg表中。这种方法涉及使用Apache Iceberg和AWS Glue在数据湖中实现基于CDC的UPSERT。您可以参考详细的参考链接以了解此方法的更多信息。

IcebergMergeOutputDF = spark.sql("""
    MERGE INTO job_catalog.iceberg_demo.iceberg_output t
    USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
    ON t.product_id = s.product_id
    WHEN MATCHED AND s.op = 'D' THEN DELETE
    WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time 
    WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
""")

下面是详细的参考:Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue

相关问题