我们希望在Spark中使用SQL Join实现SCD2。我在Github上找到了参考资料
https://gist.github.com/rampage644/cc4659edd11d9a288c1b
但不是很清楚
谁能提供一些例子或参考来实现SCD2的Spark
问候,曼尼什
我们希望在Spark中使用SQL Join实现SCD2。我在Github上找到了参考资料
https://gist.github.com/rampage644/cc4659edd11d9a288c1b
但不是很清楚
谁能提供一些例子或参考来实现SCD2的Spark
问候,曼尼什
5条答案
按热度按时间hs1ihplo1#
在较新的Spark SQL方面有点过时,但这里有一个我使用Spark SQL试用的Ralph Kimball的例子,它工作可靠。你可以运行它,它工作-但文件逻辑和类似的需要添加-这是基于1.6语法的ETL SCD 2逻辑的主体,但运行在2.x -它不是那么难,但你需要跟踪并生成测试数据和跟踪每一步:
实际编码:
示例数据,以便您知道为示例生成什么:
hlswsv352#
下面是Spark中缓慢变化的维度类型2( Dataframe 和SQL)的详细实现,使用独占连接方法。
假设源正在发送完整的数据文件,即旧记录、更新记录和新记录。
步骤:
将最近的文件数据加载到STG表从HIST表中选择所有过期记录
使用内部连接从STG和HIST中选择所有未更改的记录,并在HIST.column = STG.column上进行过滤,如下所示
选择使用HIST_TAB的独占左联接从STG_TAB更改的所有新记录和更新记录,并设置到期日和生效日期,如下所示
使用与STG表的独占左联接从HIST表中选择所有更新的旧记录,并设置其到期日期,如下所示:
unionall查询1-4并将覆盖结果插入HIST表
Scala和Pyspark中SCD type 2的更详细实现可以在这里找到-
https://github.com/sahilbhange/spark-slowly-changing-dimension
希望这有帮助!
hgc7kmma3#
scala spark:https://georgheiler.com/2020/11/19/sparkling-scd2/
注意:这不是一个完整的SCD 2-它假定一个事件表,并从其中确定/消除重复的valid_from/valid_to,即未实现合并/更新插入
jk9hmnmh4#
这里有一个更新的答案与合并。
请注意,它不适用于Spark Structured Streaming,但可以用于Spark Kafka Batch Integration。
egmofgnx5#
customer_source
:id
(唯一PK)、name
、address
、phone
、lmd
(上次修改日期)customer_dim
,带有标准附加列:is_current
、effective_start
、effective_end
假设您正在运行一个维护检查点(
lmd
)的循环作业,以了解已经处理了哪些更改。然后:1.查找更改并准备更新:
1.上塞:
实际上,源代码可能是一些流或批处理。