使用databricks/spark从包含一个id的多个更新的sourcefile创建scd2表

mklgxw1f  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(492)

我想在databricks中创建一个缓慢变化的维度。我的源Dataframe包含以下信息。

+-------------------+-------------------------+----------+-----------+-------------+
| actionimmediately |          date           | deviceid | patchguid |   status    |
+-------------------+-------------------------+----------+-----------+-------------+
| False             | 2018-08-15 04:01:00.000 |      123 | 00-001    | Install     |
| True              | 2018-08-16 00:00:00.000 |      123 | 00-001    | Install     |
| False             | 2018-08-10 01:00:00.000 |      123 | 00-001    | Not Approved|
| False             | 2020-01-01 00:00:00.000 |      333 | 11-111    | Declined    |
+-------------------+-------------------------+----------+-----------+-------------+

我希望作为输出的Dataframe如下所示:

+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| mergekey  | deviceid | patchguid |    status    | actionimmediately |        starttime        |         endtime         | current |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| 12300-001 |      123 | 00-001    | Not Approved | False             | 2018-08-10 01:00:00.000 | 2018-08-15 04:01:00.000 | False   |
| 12300-001 |      123 | 00-001    | Install      | False             | 2018-08-15 04:01:00.000 | 2018-08-16 00:00:00.000 | False   |
| 12300-001 |      123 | 00-001    | Install      | True              | 2018-08-16 00:00:00.000 | null                    | True    |
| 33311-111 |      333 | 11-111    | Declined     | False             | 2020-01-01 00:00:00.000 | null                    | True    |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+

实际上,sourcefile包含275475行
我已经试过两种解决方案,但都很慢。比如+-10小时。
解决方案1:使用三角洲湖合并
首先,我创建一个seqid,稍后用于迭代。这是因为合并不能多次更新同一行。我正在用一个窗口创建seqid。

source_df = source_df.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))
w1 = Window.partitionBy('mergekey').orderBy('date')
source_df = source_df.withColumn('seqid', row_number().over(w1))

然后创建一个for循环,该循环在每个seqid上运行并合并行。实际上,max\ seq\ id是1900

def createTable (df, SeqId):
  df\
  .withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
  .select(\
          'mergekey',\
          'deviceid',\
          'patchguid',\
          'status',\
          'actionimmediately',\
          col('date').alias('starttime'))\
  .where(col('seqid') == SeqId)\
  .withColumn('endtime',lit(None).cast('timestamp'))\
  .withColumn('current',lit(True))\
  .write.format('delta')\
  .partitionBy("current")\
  .options(header='true',path='/mnt/destinationncentral/patch_approval')\
  .saveAsTable('patch_approval')

def MergePatchApproval (df,deltatable,seqNummer):
  dataframe = df.where(col('seqid') == seqNummer)
  newToInsert = dataframe.alias('updates')\
  .join(deltatable.toDF().alias('table'),['deviceid','patchguid'])\
  .select(\
          'updates.actionimmediately',\
          'updates.date',\
          'updates.deviceid',\
          'updates.patchguid',\
          'updates.status',\
          'updates.seqid')\
  .where('table.current = true and \
  (table.status <> updates.status or table.actionimmediately <> updates.actionimmediately)')

  stagedUpdates = (newToInsert.selectExpr('NULL as mergekey','*')\
                   .union(dataframe\
                          .withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
                          .select(\
                                  'mergekey',\
                                  'actionimmediately',\
                                  'date',\
                                  'deviceid',\
                                  'patchguid',\
                                  'status',\
                                  'seqid')))

  deltatable.alias('t')\
  .merge(stagedUpdates.alias('s'),'t.current = true and t.mergekey = s.mergekey')\
  .whenMatchedUpdate(condition = 't.current = true and \
  (t.status <> s.status or t.actionimmediately <> s.actionimmediately)', \
  set = {
    'endtime':'s.date',
    'current':'false'
  }).whenNotMatchedInsert(values = {
    'mergekey':'s.mergekey',
    'deviceid':'s.deviceid',
    'patchguid':'s.patchguid',
    'status':'s.status',
    'actionimmediately':'s.actionimmediately',
    'starttime':'s.date',
    'endtime':'NULL',
    'current':'true'
  }).execute()

for i in range(max_seq_id):
  i = i + 1
  print(i)
  df = source_df.where(col('seqid') == i)
  if(i == 1):
    tablecount = spark.sql("show tables like 'patch_approval'").count()
    if(tablecount == 0):
      createTable(df,i)
      approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
    else:
      approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
      MergePatchApproval(df,approval_table,i)
  else:
    MergePatchApproval(df,approval_table,i)

这个解决方案的问题是,在azuredatalake上写数据需要一些时间,我认为这是正常的,但是每次迭代的执行时间都在增加。
解决方案2:向上插入Dataframe,最后写一次
在这个解决方案中,我也使用for循环和seqid,但是除了将每个循环都写入azuredatalake之外,我只在最后才这样做。此解决方案解决了写入延迟问题,但每个循环结束的时间仍在增加。

def createDestDF(sourceDF):
  dest_df = sourceDF\
    .select(\
            'mergekey',\
            'deviceid',\
            'patchguid',\
            'status',\
            'actionimmediately',\
            col('date').alias('starttime'))\
    .withColumn('endtime',lit(None).cast('timestamp'))\
    .withColumn('current',lit(True))
  return dest_df

def getChangedRecords(sourceDF,destDF):
  changedRecords = sourceDF.alias('u')\
  .join(destDF.alias('t'),['deviceid','patchguid'])\
  .select(\
         'u.actionimmediately',\
         'u.date',\
         'u.deviceid',\
         'u.patchguid',\
         'u.status',\
         'u.seqid',\
         'u.mergekey')\
  .where('t.current = true and \
  (t.status <> u.status or t.actionimmediately <> u.actionimmediately)')

  return changedRecords

def getNewRecords(sourceDF,destDF):
  newRecords = sourceDF.alias('n')\
  .join(destDF.alias('t'),['deviceid','patchguid'],'left')\
  .select(\
          't.mergekey',\
          'n.actionimmediately',\
          'n.date',\
          'deviceid',\
          'patchguid',\
          'n.status',\
          'n.seqid')\
  .where('t.current is null')
  return newRecords

def upsertChangedRecords(sourceDF,destDF):
  endTimeColumn = expr("""IF(endtimeOld IS NULL, date, endtimeOld)""")
  currentColumn = expr("""IF(date IS NULL, currentOld, False)""")

  updateDF = sourceDF.alias('s').join(destDF.alias('t'),'mergekey','right').select(\
                                                                                'mergekey',\
                                                                                't.deviceid',\
                                                                                't.patchguid',\
                                                                                't.status',\
                                                                                't.actionimmediately',\
                                                                                't.starttime',\
                                                                                's.date',\
                                                                                col('t.current').alias('currentOld'),\
                                                                                col('t.endTime').alias('endtimeOld'))\
  .withColumn('endtime',endTimeColumn)\
  .withColumn('current',currentColumn)\
  .drop('currentOld','date','endTimeOld')

  updateInsertDF = sourceDF\
  .select(\
          'mergekey',\
          'deviceid',\
          'patchguid',\
          'status',\
          'actionimmediately',\
          col('date').alias('starttime'))\
  .withColumn('endtime',lit(None).cast('timestamp'))\
  .withColumn('current',lit(True))

  resultDF = updateDF.union(updateInsertDF)
  return resultDF

def insertNewRecords(sourceDF, destDF):
  insertDF = sourceDF\
  .select(\
          'mergekey',\
          'deviceid',\
          'patchguid',\
          'status',\
          'actionimmediately',\
          col('date').alias('starttime'))\
  .withColumn('endtime',lit(None).cast('timestamp'))\
  .withColumn('current',lit(True))

  resultDF = destDF.union(insertDF)

  return resultDF

for i in range(max_seq_id):
  i = i + 1
  print(i)
  seq_df = source_df.where(col('seqid') == i)
  if i == 1:
    tablecount = spark.sql("show tables like 'patch_approval'").count()
    if(tablecount == 0):
      dest_df = createDestDF(seq_df)
    else:
      changed_df = getChangedRecords(seq_df,dest_df)
      new_df = getNewRecords(seq_df,dest_df)
      dest_df = upsertChangedRecords(changed_df,dest_df)
      dest_df = insertNewRecords(new_df,dest_df)
  else:
    changed_df = getChangedRecords(seq_df,dest_df)
    new_df = getNewRecords(seq_df,dest_df)
    dest_df = upsertChangedRecords(changed_df,dest_df)
    dest_df = insertNewRecords(new_df,dest_df)

dest_df\
.write\
.format('delta')\
.partitionBy('current')\
.mode('overwrite')\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')

知道如何解决for循环中执行时间增加的问题吗?
谨致问候,

9avjhtql

9avjhtql1#

据我所知,行不会随着时间的推移而从源表中消失—如果是这样,可以通过将sparkDataframe放入临时视图并对其编写查询来解决问题:

df.createOrReplaceTempView("source")

df_scd = spark.sql("""
WITH stage AS (
  SELECT *,
  LEAD(date,1) OVER (PARTITION BY deviceid, patchguid ORDER BY date) AS next_date
  FROM source
)
SELECT 
  concat(deviceid, patchguid) as mergekey
  ,deviceid
  ,patchguid
  ,status
  ,actionimmediately
  ,date AS starttime
  ,next_date AS endtime
  ,CASE WHEN next_date IS NULL THEN True ELSE False END AS current
FROM stage
""")

它应该是非常快的结果,在准确的输出你想要的。我在你的样本数据上查过了,之后的数据显示:

+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
| mergekey|deviceid|patchguid|      status|actionimmediately|           starttime|             endtime|current|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
|12300-001|     123|   00-001|Not Approved|            False|2018-08-10 01:00:...|2018-08-15 04:01:...|  false|
|12300-001|     123|   00-001|     Install|            False|2018-08-15 04:01:...|2018-08-16 00:00:...|  false|
|12300-001|     123|   00-001|     Install|             True|2018-08-16 00:00:...|                null|   true|
|33311-111|     333|   11-111|    Declined|            False|2020-01-01 00:00:...|                null|   true|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+

相关问题