MERGE INTO accounts t USING monthly_accounts_update s
ON (t.customer = s.customer)
WHEN MATCHED
THEN UPDATE SET purchases = s.purchases + t.purchases
WHEN NOT MATCHED
THEN INSERT (customer, purchases, address)
VALUES(s.customer, s.purchases, s.address)
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_demo.iceberg_output t
USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
""")
1条答案
按热度按时间vdzxcuhz1#
感谢Apache Iceberg,经过4年的漫长等待,现在可以实现upsert到数据湖表中。目前,有两种方法可以做到这一点。
使用雅典娜
第一种方法涉及使用Athena,您可以直接从Athena创建Iceberg表,并将其指定为ICEBERG表。创建表后,您可以运行merge into命令,根据需要更新或插入数据。
1.直接从Athena创建一个Iceberg表,并将其指定为ICEBERG表
Amazon Athena - Creating Iceberg tables
1.之后,可以运行merge into命令
Amazon Athena - MERGE INTO
使用胶水
第二种方法是使用Glue,您可以使用Spark SQL查询将数据合并到Iceberg表中。这种方法涉及使用Apache Iceberg和AWS Glue在数据湖中实现基于CDC的UPSERT。您可以参考详细的参考链接以了解此方法的更多信息。
下面是详细的参考:Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue