我有一个spark流作业,它每10秒加载一次apache hudi
表中的数据。如果hudi表中的行已经存在,它会更新该行。实际上,它正在执行upsert操作。
但是在hudi表中,有一个金额列也用新值更新。
for example
1 batch, id=1, amount value=10. --> in table, amount value = 10
2 batch, id=1, amount value=20. --> in table, amount value = 20
但我需要的金额值应该是30
而不是20
。我需要增量聚合金额列。
hudi是否支持增量聚合usecase而不使用外部缓存/数据库?
1条答案
按热度按时间s4n0splo1#
Apache Hudi默认使用
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
类来预合并 Dataframe 记录和更新已存储的旧记录,它只检查 Dataframe 是否包含具有相同键的重复记录,并选择具有maxordering
字段的记录,然后用从已插入数据中选择的新记录替换已存储的旧记录。但是您可以通过实现接口
org.apache.hudi.common.model.HoodieRecordPayload
并将confighoodie.compaction.payload.class
设置为您的类来创建自己的记录有效负载类。