关系数据库中的数据被加载到spark中——按理说是每天加载,但实际上不是每天加载。此外,它是db-no delta加载的完整副本。
为了方便地将维度表与主事件数据连接起来,我想:
重复数据消除(即提高以后广播连接的可能性)
有valid\u to/valid\u from列,这样即使数据每天都不可用(不一致),也可以很好地使用(从下游)
我使用的是spark 3.0.1,希望以scd2样式转换现有数据,而不会丢失历史记录。
spark-shell
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
case class Foo (key:Int, value:Int, date:String)
val d = Seq(Foo(1, 1, "20200101"), Foo(1, 8, "20200102"), Foo(1, 9, "20200120"),Foo(1, 9, "20200121"),Foo(1, 9, "20200122"), Foo(1, 1, "20200103"), Foo(2, 5, "20200101"), Foo(1, 10, "20200113")).toDF
d.show
val windowDeduplication = Window.partitionBy("key", "value").orderBy("key", "date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key", "date")
val nextThing = lead("date", 1).over(windowPrimaryKey)
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to", nextThing).withColumn("valid_to", when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show
结果:
+---+-----+----------+----------+
|key|value|valid_from| valid_to|
+---+-----+----------+----------+
| 1| 1|2020-01-01|2020-01-01|
| 1| 8|2020-01-02|2020-01-12|
| 1| 10|2020-01-13|2020-01-19|
| 1| 9|2020-01-20|2020-10-09|
| 2| 5|2020-01-01|2020-10-09|
+---+-----+----------+----------+
已经很不错了。然而:
| 1| 1|2020-01-03| 2|2020-01-12|
他迷路了。i、 e.以后(中间更改后)再次出现的任何值都将丢失。如何保持这一行而不保持较大的列,例如:
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).withColumn("valid_to", nextThing).withColumn("valid_to",
when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show
+---+-----+----------+----+----------+
|key|value|valid_from|rank| valid_to|
+---+-----+----------+----+----------+
| 1| 1|2020-01-01| 1|2020-01-01|
| 1| 8|2020-01-02| 1|2020-01-02|
| 1| 1|2020-01-03| 2|2020-01-12|
| 1| 10|2020-01-13| 1|2020-01-19|
| 1| 9|2020-01-20| 1|2020-01-20|
| 1| 9|2020-01-21| 2|2020-01-21|
| 1| 9|2020-01-22| 3|2020-10-09|
| 2| 5|2020-01-01| 1|2020-10-09|
+---+-----+----------+----+----------+
这绝对不是你想要的
我们的想法是删除复制品
但使用有效的\u to、有效的\u from保留对数据的任何历史更改
如何正确地将其转换为scd2表示,即具有有效的\u from、有效的\u to但不是drop中介状态?
注意:我不需要更新现有数据(合并到,联接)。可以重新创建/覆盖它。
i、 e.在spark中实现scd类型2似乎太复杂了。有没有更好的方法在我的情况下,国家处理是不需要的?i、 e.我有来自数据库每日完整副本的数据,并希望对其进行重复数据消除。
1条答案
按热度按时间a0x5cqrl1#
以前的方法只保留副本的第一个(最早的)版本。我认为没有连接进行状态处理的唯一解决方案是使用一个窗口函数,将每个值与前一行进行比较——如果整行没有变化,则丢弃它。
可能效率较低,但更准确。但这也取决于手头的用例,即改变的值再次被看到的可能性有多大。
退货:
对于输入:
这个函数有一个缺点,就是建立了无限量的状态-对于每个键。。。但是,当我计划将此应用于相当小的维度表时,我认为它无论如何都应该是好的。