如何使用SQL Join在Spark中实现渐变维度(SCD2)类型2

mu0hgdu0  于 2023-05-18  发布在  Apache
关注(0)|答案(5)|浏览(157)

我们希望在Spark中使用SQL Join实现SCD2。我在Github上找到了参考资料
https://gist.github.com/rampage644/cc4659edd11d9a288c1b
但不是很清楚
谁能提供一些例子或参考来实现SCD2的Spark
问候,曼尼什

hs1ihplo

hs1ihplo1#

在较新的Spark SQL方面有点过时,但这里有一个我使用Spark SQL试用的Ralph Kimball的例子,它工作可靠。你可以运行它,它工作-但文件逻辑和类似的需要添加-这是基于1.6语法的ETL SCD 2逻辑的主体,但运行在2.x -它不是那么难,但你需要跟踪并生成测试数据和跟踪每一步:

Some pre-processing required before script initiates, save a copy of existing and copy existing to the DIM_CUSTOMER_EXISTING.
   Write new output to DIM_CUSTOMER_NEW and then copy this to target, DIM_CUSTOMER_1 or DIM_CUSTOMER_2.
   The feed can also be re-created or LOAD OVERWRITE.
   ^^^ NEED SOME BETTER SCRIPTING AROUND THIS. ^^^ The Type 2 dimension is simply only Type 2 values, not a mixed Type 1 & Type 2.
   DUMPs that are accumulative can be in fact pre-processed to get the delta.
   Use case assumes we can have N input for a person with a date validity / extract supplied.
   SPARK 1.6 SQL based originally, not updated yet to SPARK 2.x SQL with nested correlated subquery support.
   CUST_CODE cannot changes unless a stable Primary Key.
   This approach handles no input, delta input, same input, all input, and can catch up and need not be run-date based.
   ^^^ Works best with deltas, as if pass all data and there is no change then still have make a dummy entry with all the same values else it will have gaps in key range 
   which means will not be able to link facts to dimensions in all cases. I.e. the discard logic works only in terms of a pure delta feed. All data can be passed but only 
   the current delta. Problem becomes difficult to solve in that we must then look for changes over different rows and expand date range, a little too complicated imho. 
   The dummy entries in the dimensions are not a huge issue. The problem is a little more difficult in such a less mutable environment, in KUDU it easier to solve. 
   Ideally there would be some sort of preprocessor that checks which fields have changed and only then passed on, but that may be a bridge too far.
   HENCE IT IS A COMPROMISE ALGORITHM necessarily. ^^^
   No Deletions processed.
   Multi-step processing for SQL required in some cases. Gaps in key ranges difficult to avoid with set processing.
   No out of order processing, that would mean re-processing all. Works on a whole date/day basis, if run more than once per day in batch then would need timestamp instead.

   0.1 Any difference analysis on existimg dumps only possible if the dumps are accumulative. If they are transactional deltas only, then this is not required.
       Care to be taken here.
   0.2 If we want only the last update for a given date, then do that here by method of Partitioning and Ranking and filtering out.
       These are all pre-processing steps as are the getting of the dimension data from which table.

   0.3 Issue is that of small files, but that is not an issue here at xxx. RAW usage only as written to KUDU in a final step.

实际编码:

import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession
   .builder
   .master("local") // Not a good idea
   .appName("Type 2 dimension update")
   .config("spark.sql.crossJoin.enabled", "true") // Needed to add this
   .getOrCreate()

spark.sql("drop table if exists DIM_CUSTOMER_EXISTING")
spark.sql("drop table if exists DIM_CUSTOMER_NEW")
spark.sql("drop table if exists FEED_CUSTOMER")
spark.sql("drop table if exists DIM_CUSTOMER_TEMP")
spark.sql("drop table if exists DIM_CUSTOMER_WORK")
spark.sql("drop table if exists DIM_CUSTOMER_WORK_2")
spark.sql("drop table if exists DIM_CUSTOMER_WORK_3")
spark.sql("drop table if exists DIM_CUSTOMER_WORK_4")

spark.sql("create table DIM_CUSTOMER_EXISTING (DWH_KEY int, CUST_CODE String, CUST_NAME String, ADDRESS_CITY String, SALARY int, VALID_FROM_DT String, VALID_TO_DT String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION  '/FileStore/tables/alhwkf661500326287094' ")

spark.sql("create table DIM_CUSTOMER_NEW (DWH_KEY int, CUST_CODE String, CUST_NAME String, ADDRESS_CITY String, SALARY int, VALID_FROM_DT String, VALID_TO_DT String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION  '/FileStore/tables/DIM_CUSTOMER_NEW_3' ")

spark.sql("CREATE TABLE FEED_CUSTOMER (CUST_CODE String, CUST_NAME String, ADDRESS_CITY String, SALARY int, VALID_DT String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION  '/FileStore/tables/mhiscfsv1500226290781' ")

// 1. Get maximum value in dimension, this differs to other RDD approach, issues in parallel? May be other way to be done! Check, get a DF here and this is the interchangability
val max_val = spark.sql("select max(dwh_key) from DIM_CUSTOMER_EXISTING")

//max_val.show()
val null_count = max_val.filter("max(DWH_KEY) is null").count()
var max_Dim_Key = 0;
if ( null_count == 1 ) {
     max_Dim_Key = 0
} else {
     max_Dim_Key = max_val.head().getInt(0)
}

//2. Cannot do simple difference processing. The values of certain fields could be flip-flopping over time. A too simple MINUS will not work well. Need to process relative to
//   youngest existing record etc. and roll the transactions forward. Hence we will not do any sort of difference analysis between new dimension data and existing dimension
//   data in any way.
//   DO NOTHING.

//3. Capture new stuff to be inserted. 
//   Some records for a given business key can be linea recta inserted as there have been no mutations to consider at all as there is nothing in current Staging. Does not mean
//   delete.
//   Also, the older mutations need not be re-processed, only the youngest! The younger one may need closing off or not, need to decide if it is now 
//   copied across or subject to updating in this cycle, depends on the requirements.
//  Older mutations copied across immediately.
//  DELTA not always strictly speaking needed, but common definitions. Some ranking required.

spark.sql("""insert into  DIM_CUSTOMER_NEW     select * 
                                                 from DIM_CUSTOMER_EXISTING 
                                                where CUST_CODE not in (select distinct CUST_CODE FROM FEED_CUSTOMER) """) // This does not need RANKing, DWH Key retained.

spark.sql("""create table DIM_CUSTOMER_TEMP as select *, dense_rank() over (partition by CUST_CODE order by VALID_FROM_DT desc) as RANK 
                                             from DIM_CUSTOMER_EXISTING """)

spark.sql("""insert into DIM_CUSTOMER_NEW  select DWH_KEY, CUST_CODE, CUST_NAME, ADDRESS_CITY, SALARY, VALID_FROM_DT, VALID_TO_DT 
                                          from DIM_CUSTOMER_TEMP 
                                         where CUST_CODE in (select distinct CUST_CODE from FEED_CUSTOMER)
                                           and RANK <> 1 """) 
// For updating of youngest record in terms of SLCD, we use use AND RANK <> 1 to filter these out here as we want to close off the period in this record, but other younger 
// records can be passed through immediately with their retained DWH Key.

//4. Combine Staging and those existing facts required. The result of this eventually will be stored in DIM_CUSTOMER_NEW which can be used for updating a final target. 
//   Issue here is that DWH Key not yet set and different columns. DWH key can be set last.
//4.1 Get records to process, the will have the status NEW.

spark.sql("""create table DIM_CUSTOMER_WORK (DWH_KEY int, CUST_CODE String, CUST_NAME String, ADDRESS_CITY String, SALARY int, VALID_FROM_DT String, VALID_TO_DT String, RECSTAT String)  """)
spark.sql("""insert  into DIM_CUSTOMER_WORK    select  0, CUST_CODE, CUST_NAME, ADDRESS_CITY, SALARY, VALID_DT, '2099-12-31', "NEW"
                                             from  FEED_CUSTOMER """)
//4.2 Get youngest already existing dimension record to process in conjunction with newer values.
spark.sql("""insert  into DIM_CUSTOMER_WORK    select  DWH_KEY, CUST_CODE, CUST_NAME, ADDRESS_CITY, SALARY, VALID_FROM_DT, VALID_TO_DT, "OLD"
                                             from  DIM_CUSTOMER_TEMP 
                                            where  CUST_CODE in (select distinct CUST_CODE from FEED_CUSTOMER)
                                              and RANK = 1 """) 

// 5. ISSUE with first record in a set. It is not a delta or is used for making a delta, need to know what to do or bypass, depends on case.
//    Here we are doing deltas, so first rec is a complete delta
//    RECSTAT to be filtered out at end
//    NEW, 1 = INSERT  --> checked, is correct way, can do in others. No delta computation required
//    OLD, 1 = DO NOTHING
//    else do delta and INSERT

//5.1 RANK and JOIN to get before and after images in CDC format so that we can decide what needs to be closed off.
//    Get the new DWH key values + offset, there may exist gaps eventually.

spark.sql(""" create table DIM_CUSTOMER_WORK_2  as select *, rank() over (partition by CUST_CODE order by VALID_FROM_DT asc) as rank FROM DIM_CUSTOMER_WORK  """)

//DWH_KEY, CUST_CODE, CUST_NAME, BIRTH_CITY, SALARY,VALID_FROM_DT, VALID_TO_DT, "OLD"

spark.sql(""" create table DIM_CUSTOMER_WORK_3 as 
                                     select  T1.DWH_KEY as T1_DWH_KEY, T1.CUST_CODE as T1_CUST_CODE, T1.rank as CURR_RANK, T2.rank as NEXT_RANK, 
                                             T1.VALID_FROM_DT as CURR_VALID_FROM_DT, T2.VALID_FROM_DT as NEXT_VALID_FROM_DT, 
                                             T1.VALID_TO_DT as CURR_VALID_TO_DT, T2.VALID_TO_DT as NEXT_VALID_TO_DT, 
                                             T1.CUST_NAME as CURR_CUST_NAME, T2.CUST_NAME as NEXT_CUST_NAME,
                                             T1.SALARY as CURR_SALARY, T2.SALARY as NEXT_SALARY, 
                                             T1.ADDRESS_CITY as CURR_ADDRESS_CITY, T2.ADDRESS_CITY as NEXT_ADDRESS_CITY, 
                                             T1.RECSTAT as CURR_RECSTAT, T2.RECSTAT as NEXT_RECSTAT   
                                       from DIM_CUSTOMER_WORK_2 T1 LEFT OUTER JOIN DIM_CUSTOMER_WORK_2 T2 
                                         on T1.CUST_CODE = T2.CUST_CODE AND T2.rank = T1.rank + 1  """)  

//5.2 Get the data for computing new Dimension Surrogate DWH Keys, must execute new query or could use DF's and RDS, RDDs, but chosen for SPARK SQL as aeasier to follow
spark.sql(s""" create table DIM_CUSTOMER_WORK_4 as 
                                     select  *, row_number() OVER( ORDER BY T1_CUST_CODE) as ROW_NUMBER, '$max_Dim_Key' as DIM_OFFSET 
                                       from DIM_CUSTOMER_WORK_3   """)  

 //spark.sql("""SELECT * FROM DIM_CUSTOMER_WORK_4     """).show()
 //Execute the above to see results, could not format here.

//5.3 Process accordingly and check if no change at all, if no change can get holes in the sequence numbers, that is not an issue. NB: NOT DOING THIS DUE TO COMPLICATIONS !!! 
//    See sample data above for decision-making on what to do. NOTE THE FACT THAT WE WOULD NEED A PRE_PROCCESOR TO CHECK IF FIELD OF INTEREST ACTUALLY CHANGED
//    to get the best result. 
//    We could elaborate and record via an extra step if there were only two records per business key and if all the current and only next record fields were all the same, 
//    we could disregard the first and the second record. Will attempt that later as an extra optimization. As soon as there are more than two here, then this scheme packs up 
//    Some effort still needed.

//5.3.1 Records that just need to be closed off. The previous version gets an appropriate DATE - 1. Dates must not overlap.  
//      No check on whether data changed or not due to issues above.

spark.sql("""insert into  DIM_CUSTOMER_NEW  select T1_DWH_KEY, T1_CUST_CODE, CURR_CUST_NAME, CURR_ADDRESS_CITY, CURR_SALARY, 
                                               CURR_VALID_FROM_DT, cast(date_sub(cast(NEXT_VALID_FROM_DT as DATE), 1) as STRING)
                                          from DIM_CUSTOMER_WORK_4 
                                         where CURR_RECSTAT = 'OLD'  """)  

//5.3.2 Records that are the last in the sequence must have high end 2099-12-31 set, which has already been done. 
//      No check on whether data changed or not due to issues above.
spark.sql("""insert into  DIM_CUSTOMER_NEW  select ROW_NUMBER + DIM_OFFSET, T1_CUST_CODE, CURR_CUST_NAME, CURR_ADDRESS_CITY, CURR_SALARY, 
                                               CURR_VALID_FROM_DT, CURR_VALID_TO_DT
                                          from DIM_CUSTOMER_WORK_4 
                                         where NEXT_RANK is null  """)  

//5.3.3 
spark.sql("""insert into  DIM_CUSTOMER_NEW  select ROW_NUMBER + DIM_OFFSET, T1_CUST_CODE, CURR_CUST_NAME, CURR_ADDRESS_CITY, CURR_SALARY, 
                                               CURR_VALID_FROM_DT, cast(date_sub(cast(NEXT_VALID_FROM_DT as DATE), 1) as STRING)
                                          from DIM_CUSTOMER_WORK_4 
                                         where CURR_RECSTAT = 'NEW' 
                                           and NEXT_RANK is not null""")  
spark.sql("""SELECT * FROM DIM_CUSTOMER_NEW   """).show()

// So, the question is if we could have done without JOINing and just sorted due to gap processing. This was derived off the delta processing but it turned out a little 
// different.
// Well we did need the JOIN for next date at least, so if we add some optimization it still holds.
// My logic applied here per different steps, may well be less steps, left as is.

//6. The copy / insert to get a new big target table version and re-compile views. Outside of this actual processing. Logic performed elsewhere.

// NOTE now that 2.x supports nested correlated sub-queries are supported, so would need to re-visit this at a later point, but can leave as is.
// KUDU means no more restating.

示例数据,以便您知道为示例生成什么:

+-------+---------+----------------+------------+------+-------------+-----------+
|DWH_KEY|CUST_CODE|       CUST_NAME|ADDRESS_CITY|SALARY|VALID_FROM_DT|VALID_TO_DT|
+-------+---------+----------------+------------+------+-------------+-----------+
|    230|  E222222|   Pete Saunders|       Leeds| 75000|   2013-03-09| 2099-12-31|
|    400|  A048901|  John Alexander|     Calgary| 22000|   2015-03-24| 2017-10-22|
|    402|  A048901|  John Alexander|  Wellington| 47000|   2017-10-23| 2099-12-31|
|    403|  B787555|     Mark de Wit|Johannesburg| 49500|   2017-10-02| 2099-12-31|
|    406|  C999666|      Daya Dumar|      Mumbai| 50000|   2016-12-16| 2099-12-31|
|    404|  C999666|      Daya Dumar|      Mumbai| 49000|   2016-11-11| 2016-12-14|
|    405|  C999666|      Daya Dumar|      Mumbai| 50000|   2016-12-15| 2016-12-15|
|    300|  A048901|  John Alexander|     Calgary| 15000|   2014-03-24| 2015-03-23|
+-------+---------+----------------+------------+------+-------------+-----------+
hlswsv35

hlswsv352#

下面是Spark中缓慢变化的维度类型2( Dataframe 和SQL)的详细实现,使用独占连接方法。
假设源正在发送完整的数据文件,即旧记录、更新记录和新记录。
步骤:
将最近的文件数据加载到STG表从HIST表中选择所有过期记录

1. select * from HIST_TAB where exp_dt != '2099-12-31'

使用内部连接从STG和HIST中选择所有未更改的记录,并在HIST.column = STG.column上进行过滤,如下所示

2. select hist.* from HIST_TAB hist inner join STG_TAB stg on hist.key = stg.key where hist.column = stg.column

选择使用HIST_TAB的独占左联接从STG_TAB更改的所有新记录和更新记录,并设置到期日和生效日期,如下所示

3. select stg.*, eff_dt (yyyy-MM-dd), exp_dt (2099-12-31) from STG_TAB stg left join (select * from HIST_TAB where exp_dt = '2099-12-31') hist 
on hist.key = stg.key where hist.key is null or hist.column != stg.column

使用与STG表的独占左联接从HIST表中选择所有更新的旧记录,并设置其到期日期,如下所示:

4. select hist.*, exp_dt(yyyy-MM-dd) from (select * from HIST_TAB where exp_dt = '2099-12-31') hist left join STG_TAB stg 
on hist.key= stg.key where hist.key is null or hist.column!= stg.column

unionall查询1-4并将覆盖结果插入HIST表
Scala和Pyspark中SCD type 2的更详细实现可以在这里找到-
https://github.com/sahilbhange/spark-slowly-changing-dimension
希望这有帮助!

hgc7kmma

hgc7kmma3#

scala spark:https://georgheiler.com/2020/11/19/sparkling-scd2/
注意:这不是一个完整的SCD 2-它假定一个事件表,并从其中确定/消除重复的valid_from/valid_to,即未实现合并/更新插入

val df = Seq(("k1","foo", "2020-01-01"), ("k1","foo", "2020-02-01"), ("k1","baz", "2020-02-01"),
("k2","foo", "2019-01-01"), ("k2","foo", "2019-02-01"), ("k2","baz", "2019-02-01")).toDF("key", "value_1", "date").withColumn("date", to_date(col("date")))
df.show
+---+-------+----------+
|key|value_1| date|
+---+-------+----------+
| k1| foo|2020-01-01|
| k1| foo|2020-02-01|
| k1| baz|2020-02-01|
| k2| foo|2019-01-01|
| k2| foo|2019-02-01|
| k2| baz|2019-02-01|
+---+-------+----------+
df.printSchema
root
|-- key: string (nullable = true)
|-- value_1: string (nullable = true)
|-- date: date (nullable = true)
df.transform(deduplicateScd2(Seq("key"), Seq("date"), "date", Seq())).show
+---+-------+----------+----------+
|key|value_1|valid_from| valid_to|
+---+-------+----------+----------+
| k1| foo|2020-01-01|2020-02-01|
| k1| baz|2020-02-01|2020-11-18|
| k2| foo|2019-01-01|2019-02-01|
| k2| baz|2019-02-01|2020-11-18|
+---+-------+----------+----------+

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.functions.lead
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.current_date

def deduplicateScd2(
      key: Seq[String],
      sortChangingIgnored: Seq[String],
      timeColumn: String,
      columnsToIgnore: Seq[String]
  )(df: DataFrame): DataFrame = {
    val windowPrimaryKey = Window
      .partitionBy(key.map(col): _*)
      .orderBy(sortChangingIgnored.map(col): _*)
    val columnsToCompare =
      df.drop(key ++ sortChangingIgnored: _*).drop(columnsToIgnore: _*).columns
    val nextDataChange = lead(timeColumn, 1).over(windowPrimaryKey)
    val deduplicated = df
      .withColumn(
        "data_changes_start",
        columnsToCompare
          .map(e => {
            val previous = lag(col(e), 1).over(windowPrimaryKey)
            val self = col(e)
            // 3 cases: 1.: start (previous is NULL), 2: in between, try to collapse 3: end (= next is null)
            // first, filter to only start & end events (= updates/invalidations of records)
            //self =!= previous or self =!= next or previous.isNull or next.isNull
            self =!= previous or previous.isNull
          })
          .reduce(_ or _)
      )
      .withColumn(
        "data_changes_end",
        columnsToCompare
          .map(e => {
            val next = lead(col(e), 1).over(windowPrimaryKey)
            val self = col(e)
            // 3 cases: 1.: start (previous is NULL), 2: in between, try to collapse 3: end (= next is null)
            // first, filter to only start & end events (= updates/invalidations of records)
            self =!= next or next.isNull
          })
          .reduce(_ or _)
      )
      .filter(col("data_changes_start") or col("data_changes_end"))
      .drop("data_changes")
    deduplicated //.withColumn("valid_to", nextDataChange)
      .withColumn(
        "valid_to",
        when(col("data_changes_end") === true, col(timeColumn))
          .otherwise(nextDataChange)
      )
      .filter(col("data_changes_start") === true)
      .withColumn(
        "valid_to",
        when(nextDataChange.isNull, current_date()).otherwise(col("valid_to"))
      )
      .withColumnRenamed(timeColumn, "valid_from")
      .drop("data_changes_end", "data_changes_start")
  }
}
jk9hmnmh

jk9hmnmh4#

这里有一个更新的答案与合并。
请注意,它不适用于Spark Structured Streaming,但可以用于Spark Kafka Batch Integration。

// 0. Standard, start of program. 

// Handles multiple business keys in a single run. DELTA tables.
// Schema evolution also handled.
    
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val sparkSession = SparkSession.builder
                               .master("local") // Not realistic
                               .appName("REF Zone History stuff and processing")
                               .enableHiveSupport() // Standard in Databricks.
                               .getOrCreate()

// 1. Read newer data to process in some way. Create tempView.
// In general we should have few rows to process, i.e. not at scale. 
val dfA = spark.read.option("multiLine",false).json("/FileStore/tables/new_customers_json_multiple_alt3.txt")  // New feed.
dfA.createOrReplaceTempView("newFeed")

// 2. First create the target for data at rest if it does not exist. Add an ASC col_key. Should only occur once.  
val save_path = "/some_loc_fix/ref/atRest/data"  // Make dynamic.
val table_name = "CUSTOMERS_AT_REST" 
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name + " LOCATION '" + save_path + "'" + " AS SELECT * from newFeed WHERE 1 = 0 " )  // Can also use limit 0 instead of WHERE 1 = 0.
// Add an ASC col_key column if it does not exist.  
// I have in input valid_from_dt, but it could be different so we would need to add in reality as well. Mark to decide.
 try {
      spark.sql("ALTER TABLE " + table_name + " ADD COLUMNS (col_key BIGINT FIRST, valid_to_dt STRING) ")
    } catch {
        case unknown: Exception => {
             None
      }
    }

// 3. Get maximum value for target. This is a necessity.
val max_val = spark.sql("select max(col_key) from " + table_name)
//max_val.show()
val null_count = max_val.filter("max(col_key) is null").count()
var max_Col_Key: BigInt = 0;
if ( null_count == 1 ) {
     max_Col_Key = 0
} else {
        max_Col_Key = max_val.head().getLong(0)  // Long and BIGINT interoperable.
}

// 4.1 Create a temporary table for getting the youngest records from the existing data. table_name as variable, newFeed tempView as string. Then apply processing.
val dfB = spark.sql(" select O.* from (select A.cust_code, max(A.col_key) as max_col_key from " + table_name + " A where A.cust_code in (select B.cust_code from newFeed B ) group by A.cust_code ) Z, " + table_name + " O where O.col_key =  Z.max_col_key ")  // Most recent records. 
// No tempView required.

// 4.2 Get the set of data to actually process. New feed + youngest records in feed.
val dfC =dfA.unionByName(dfB, true)
dfC.createOrReplaceTempView("cusToProcess")
 
// 4.3 RANK
val df1 = spark.sql("""select *, dense_rank() over (partition by CUST_CODE order by VALID_FROM_DT desc) as RANK from CusToProcess """)  
df1.createOrReplaceTempView("CusToProcess2")

// 4.4 JOIN adjacent records & process closing off dates etc.
val df2 = spark.sql("""select A.*, B.rank as B_rank, cast(date_sub(cast(B.valid_from_dt as DATE), 1) as STRING) as untilMinus1  
                         from CusToProcess2 A LEFT OUTER JOIN CusToProcess2 B 
                           on A.cust_code = B.cust_code and A.RANK = B.RANK + 1 """) 
val df3 = df2.drop("valid_to_dt").withColumn("valid_to_dt", $"untilMinus1").drop("untilMinus1").drop("B_rank")
val df4 = df3.withColumn("valid_to_dt", when($"valid_to_dt".isNull, lit("2099-12-31")).otherwise($"valid_to_dt")).drop("RANK")
df4.createOrReplaceTempView("CusToProcess3")

val df5 = spark.sql(s""" select  *, row_number() OVER( ORDER BY cust_code ASC, valid_from_dt ASC) as ROW_NUMBER, '$max_Col_Key' as col_OFFSET 
                                       from CusToProcess3   """)  

// Add new ASC col_key, gaps can result, not an issue must always be ascending.
val df6 = df5.withColumn("col_key", when($"col_key".isNull, ($"ROW_NUMBER" + $"col_OFFSET")).otherwise($"col_key")) 
val df7 = df6.withColumn("col_key", col("col_key").cast(LongType)).drop("ROW_NUMBER").drop("col_OFFSET")

// 5. ACTUAL MERGE, is very simple.
// More than one Merge key possible? Need then to have a col_key if only one such possible.
df7.createOrReplaceTempView("CUST_DELTA")
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
spark.sql(""" MERGE INTO CUSTOMERS_AT_REST
              USING CUST_DELTA
              ON CUSTOMERS_AT_REST.col_key = CUST_DELTA.col_key 
                WHEN MATCHED THEN
                  UPDATE SET *
                WHEN NOT MATCHED THEN
                  INSERT *
          """)
egmofgnx

egmofgnx5#

  • 如果您有一个包含列的源表customer_sourceid(唯一PK)、nameaddressphonelmd(上次修改日期)
  • 和SCD类型2调光表,名为customer_dim,带有标准附加列:is_currenteffective_starteffective_end

假设您正在运行一个维护检查点(lmd)的循环作业,以了解已经处理了哪些更改。然后:
1.查找更改并准备更新:

-- incoming changes as updated_rows, that you want to merge into customer_dim
CREATE TABLE updated_rows AS
  SELECT DISTINCT -- Make sure there are no duplicates
    id,
    name,
    address,
    phone,
    1 as is_current,
    lmd as effective_start,
    CAST(null as TIMESTAMP) effective_end
  FROM customer_source
  -- 2023-11-22 00:00:00 is the last checkpoint
  WHERE lmd > CAST('2023-11-22 00:00:00' AS TIMESTAMP)
;

1.上塞:

MERGE INTO customer_dim AS c USING (
  SELECT    -- UPDATES
    ur.id as merge_key,
    ur.*
  FROM
    updated_rows ur
  UNION ALL
  SELECT    -- INSERTS
    NULL as merge_key,
    ur.*
  FROM
    updated_rows ur
    JOIN customer_dim c ON c.id = ur.id
    AND c.is_current = 1
  WHERE -- ignore records with no changes
      c.name <> ur.name
      OR c.phone <> ur.phone
      OR c.address <> ur.address
    
) u ON c.id = u.merge_key -- Match record condition
WHEN MATCHED AND -- ignore records with no changes
      c.name <> ur.name
      OR c.phone <> ur.phone
      OR c.address <> ur.address
  THEN
  UPDATE SET -- Update fields on 'old' records
      is_current = 0,
      effective_end = ur.lmd
WHEN NOT MATCHED THEN
  INSERT
    *

实际上,源代码可能是一些流或批处理。

相关问题