如何将数据从pyspark持久化到hive-避免重复

hiz5n14c  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(372)

我和你一起工作 graphframes , pyspark ,和 hive 使用图形数据。在处理数据时,我将构建一个图,并最终将这些数据持久化到一个配置单元表中,在那里我将不再更新它。
后续运行可能与以前运行的节点有关系,因此我希望确保不复制数据。
例如,运行#1可能会找到节点: A , B , C . 运行#2可能会重新找到节点 A ,并查找新节点 X , Y , Z . 我不想 A 在我的table上出现两次。
我正在寻找处理此问题的最佳方法,并希望解决以下问题:
在处理与节点关联的元数据时,我需要跟踪节点的状态。我只想在完成此处理后将节点的数据持久化到配置单元。
我希望确保在遇到同一节点时(例如,当我重新查找时),不会创建重复的数据 A 节点,我不想在配置单元中插入另一行)
我目前正在修补最好的方法来做到这一点。我知道 hive 现在支持acid事务,但看起来不像 pyspark 当前支持积垢类型的操作。我的计划是:
每次跑步时,创建一个 dataframe 存储我找到的节点。
找到新节点时:检查节点是否已存在于配置单元中(例如。 sqlContext.sql("SELECT * FROM existingTable WHERE name="<NAME>") . 如果不存在,则更新 dataframex = vertices.withColumn("name", F.when(F.col("id")=="a", "<THE-NEW-NAME>").otherwise(F.col("name"))) 将其添加到我们的Dataframe中。
完成所有节点的处理后,创建临时视图: x.createOrReplaceTempView("myTmpView") 最后,使用 sqlContext.sql("INSERT INTO TABLE existingTable SELECT * FROM myTmpView") 我想这会管用的,但它看起来非常粗糙。我不确定这是不是因为我对Hive/星火缺乏了解,或者这只是技术的本质。有没有更好的办法?以这种方式处理它有性能成本吗?

mbzjlibv

mbzjlibv1#

在deltalakeapi中,使用scala和python支持upserts(merge)。这正是你想要实现的。
https://docs.delta.io/latest/delta-update.html#merge-示例
这里有另一个解决方案
在表中更新列的时间戳
联合上一个运行结果和当前运行结果
按“节点”分组,选择最新时间戳
保存结果

相关问题