使用spark递增聚合hudi表值

eqqqjvef  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(158)

我有一个spark流作业,它每10秒加载一次apache hudi表中的数据。如果hudi表中的行已经存在,它会更新该行。实际上,它正在执行upsert操作。
但是在hudi表中,有一个金额列也用新值更新。

for example
 1 batch, id=1, amount value=10. --> in table, amount value = 10
 2 batch, id=1, amount value=20. --> in table, amount value = 20

但我需要的金额值应该是30而不是20。我需要增量聚合金额列。
hudi是否支持增量聚合usecase而不使用外部缓存/数据库?

s4n0splo

s4n0splo1#

Apache Hudi默认使用org.apache.hudi.common.model.OverwriteWithLatestAvroPayload类来预合并 Dataframe 记录和更新已存储的旧记录,它只检查 Dataframe 是否包含具有相同键的重复记录,并选择具有max ordering字段的记录,然后用从已插入数据中选择的新记录替换已存储的旧记录。
但是您可以通过实现接口org.apache.hudi.common.model.HoodieRecordPayload并将config hoodie.compaction.payload.class设置为您的类来创建自己的记录有效负载类。

相关问题