spark sql在不删除历史状态的情况下生成scd2

t5zmwmid  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(382)

关系数据库中的数据被加载到spark中——按理说是每天加载,但实际上不是每天加载。此外,它是db-no delta加载的完整副本。
为了方便地将维度表与主事件数据连接起来,我想:
重复数据消除(即提高以后广播连接的可能性)
有valid\u to/valid\u from列,这样即使数据每天都不可用(不一致),也可以很好地使用(从下游)
我使用的是spark 3.0.1,希望以scd2样式转换现有数据,而不会丢失历史记录。

spark-shell

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

case class Foo (key:Int, value:Int, date:String)
val d = Seq(Foo(1, 1, "20200101"), Foo(1, 8, "20200102"), Foo(1, 9, "20200120"),Foo(1, 9, "20200121"),Foo(1, 9, "20200122"), Foo(1, 1, "20200103"), Foo(2, 5, "20200101"), Foo(1, 10, "20200113")).toDF
d.show

val windowDeduplication =  Window.partitionBy("key", "value").orderBy("key", "date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key", "date")

val nextThing = lead("date", 1).over(windowPrimaryKey)
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to", nextThing).withColumn("valid_to", when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show

结果:

+---+-----+----------+----------+
|key|value|valid_from|  valid_to|
+---+-----+----------+----------+
|  1|    1|2020-01-01|2020-01-01|
|  1|    8|2020-01-02|2020-01-12|
|  1|   10|2020-01-13|2020-01-19|
|  1|    9|2020-01-20|2020-10-09|
|  2|    5|2020-01-01|2020-10-09|
+---+-----+----------+----------+

已经很不错了。然而:

|  1|    1|2020-01-03|   2|2020-01-12|

他迷路了。i、 e.以后(中间更改后)再次出现的任何值都将丢失。如何保持这一行而不保持较大的列,例如:

d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).withColumn("valid_to", nextThing).withColumn("valid_to", 

when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show

+---+-----+----------+----+----------+
|key|value|valid_from|rank|  valid_to|
+---+-----+----------+----+----------+
|  1|    1|2020-01-01|   1|2020-01-01|
|  1|    8|2020-01-02|   1|2020-01-02|
|  1|    1|2020-01-03|   2|2020-01-12|
|  1|   10|2020-01-13|   1|2020-01-19|
|  1|    9|2020-01-20|   1|2020-01-20|
|  1|    9|2020-01-21|   2|2020-01-21|
|  1|    9|2020-01-22|   3|2020-10-09|
|  2|    5|2020-01-01|   1|2020-10-09|
+---+-----+----------+----+----------+

这绝对不是你想要的
我们的想法是删除复制品
但使用有效的\u to、有效的\u from保留对数据的任何历史更改
如何正确地将其转换为scd2表示,即具有有效的\u from、有效的\u to但不是drop中介状态?
注意:我不需要更新现有数据(合并到,联接)。可以重新创建/覆盖它。
i、 e.在spark中实现scd类型2似乎太复杂了。有没有更好的方法在我的情况下,国家处理是不需要的?i、 e.我有来自数据库每日完整副本的数据,并希望对其进行重复数据消除。

a0x5cqrl

a0x5cqrl1#

以前的方法只保留副本的第一个(最早的)版本。我认为没有连接进行状态处理的唯一解决方案是使用一个窗口函数,将每个值与前一行进行比较——如果整行没有变化,则丢弃它。
可能效率较低,但更准确。但这也取决于手头的用例,即改变的值再次被看到的可能性有多大。

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

case class Foo (key:Int, value:Int, value2:Int, date:String)
val d = Seq(Foo(1, 1,1, "20200101"), Foo(1, 8,1, "20200102"), Foo(1, 9,1, "20200120"),Foo(1, 6,1, "20200121"),Foo(1, 9,1, "20200122"), Foo(1, 1,1, "20200103"), Foo(2, 5,1, "20200101"), Foo(1, 10,1, "20200113"), Foo(1, 9,1, "20210120"),Foo(1, 9,1, "20220121"),Foo(1, 9,3, "20230122")).toDF

def compare2Rows(key:Seq[String], sortChangingIgnored:Seq[String], timeColumn:String)(df:DataFrame):DataFrame = {
    val windowPrimaryKey = Window.partitionBy(key.map(col):_*).orderBy(sortChangingIgnored.map(col):_*)
    val columnsToCompare = df.drop(key ++ sortChangingIgnored:_*).columns

    val nextDataChange = lead(timeColumn, 1).over(windowPrimaryKey)

    val deduplicated = df.withColumn("data_changes", columnsToCompare.map(e=> col(e) =!= lead(col(e), 1).over(windowPrimaryKey)).reduce(_ or _)).filter(col("data_changes").isNull or col("data_changes"))
    deduplicated.withColumn("valid_to", when(nextDataChange.isNotNull, date_sub(nextDataChange, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").drop("data_changes")
}
d.orderBy("key", "date").show
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).transform(compare2Rows(Seq("key"), Seq("date"), "date")).orderBy("key", "valid_from", "valid_to").show

退货:

+---+-----+------+----------+----------+
|key|value|value2|valid_from|  valid_to|
+---+-----+------+----------+----------+
|  1|    1|     1|2020-01-01|2020-01-01|
|  1|    8|     1|2020-01-02|2020-01-02|
|  1|    1|     1|2020-01-03|2020-01-12|
|  1|   10|     1|2020-01-13|2020-01-19|
|  1|    9|     1|2020-01-20|2020-01-20|
|  1|    6|     1|2020-01-21|2022-01-20|
|  1|    9|     1|2022-01-21|2023-01-21|
|  1|    9|     3|2023-01-22|2020-10-09|
|  2|    5|     1|2020-01-01|2020-10-09|
+---+-----+------+----------+----------+

对于输入:

+---+-----+------+--------+
|key|value|value2|    date|
+---+-----+------+--------+
|  1|    1|     1|20200101|
|  1|    8|     1|20200102|
|  1|    1|     1|20200103|
|  1|   10|     1|20200113|
|  1|    9|     1|20200120|
|  1|    6|     1|20200121|
|  1|    9|     1|20200122|
|  1|    9|     1|20210120|
|  1|    9|     1|20220121|
|  1|    9|     3|20230122|
|  2|    5|     1|20200101|
+---+-----+------+--------+

这个函数有一个缺点,就是建立了无限量的状态-对于每个键。。。但是,当我计划将此应用于相当小的维度表时,我认为它无论如何都应该是好的。

相关问题