SQL Server Historical data population using data change history from SalesForce

bq3bfh9z  于 2023-10-15  发布在  其他
关注(0)|答案(1)|浏览(86)

I have the below 2 tables already being ingested to my SQL Server from Salesforce. I have no control over the storage in Salesforce.

1. LiveTable contains the latest data. All changes are updated in the row itself.

------------------------------------------------
ID  Date        User    Task            Status 
------------------------------------------------
1   15/09/2023  Mark    Visualization   Stage 4
2   06/08/2023  Jane    Analytics       Stage 3
3   10/09/2023  John    Data Entry      Stage 3

2. ChangeTable which contains all the changes done on LiveTable over time (for each field).

---------------------------------------------------
ID  Date        Field   OldValue    NewValue
---------------------------------------------------
1   02/09/2023  User    Antony      Mary
1   02/09/2023  Status  Stage 1     Stage 2
1   05/09/2023  Status  Stage 2     Stage 3
1   10/09/2023  User    Mary        Mark
1   15/09/2023  Task    Data Entry  Visualization
1   15/09/2023  Status  Stage 3     Stage 4
2   02/08/2023  Status  Stage 1     Stage 2
2   06/08/2023  Status  Stage 2     Stage 3
2   06/08/2023  User    Jacob       Jane

Now, I want to create a history table which is similar to LiveTable to track all changes made to the LiveTable grouped by date.

Expected History table:

ID  Date        User    Task            Status
-----------------------------------------------
1   02/09/2023  Mary    Data Entry      Stage 2
1   05/09/2023  Mary    Data Entry      Stage 3
1   10/09/2023  Mark    Data Entry      Stage 3
1   15/09/2023  Mark    Visualization   Stage 4
2   02/08/2023  Jacob   Visualization   Stage 2
2   06/08/2023  Jane    Visualization   Stage 3

As you can see, I need to have a record for each change, but keep other columns as per the latest data available at that point. Multiple field changes could happen on the same day, which can be consolidated together grouped by date.

I can write SQL query with case to achieve the below table to get the changes on a daily basis. But I don't want to do it this way as I have 500+ columns in the table.

ID  Date        User    Task            Status
-----------------------------------------------
1   02/09/2023  Mary                    Stage 2
1   05/09/2023                          Stage 3
1   10/09/2023  Mark        
1   15/09/2023          Visualization   Stage 4
2   02/08/2023                          Stage 2
2   06/08/2023  Jane                    Stage 3

What would be the best way to achieve this? I am using Azure DataFactory and Databricks in the ETL process. I can bring the logic into the ETL phase if recommended.

w8rqjzmb

w8rqjzmb1#

You can use below pyspark code for getting history table.

First, load the LiveTable and ChangeTable table into databricks from sql.

I assume you loaded data into 2 dataframes. liveTableDF and changeTableDF.

from pyspark.sql import functions as F
changeTableGrouped = changeTableDF.groupBy("Date", "ID").agg(
    F.collect_list(F.expr("map(Field, NewValue)")).alias("changes")
)

Get the changes by Date and ID.

Below code generates a list of dictionary further converted to dataframe and write to sql database.

cols = liveTableDF.columns
fdata=[]
for row in changeTableGrouped.sort("ID","Date").collect():
    tmp={}
    changes = {key: value for item in row["changes"] for key, value in item.items()}

    tmp["Date"] = row["Date"]
    tmp["ID"] = row["ID"]

    for col in cols[2:]:

        first_row = changeTableDF.filter(((F.col("ID")==row["ID"]) & (F.col("Date")<row["Date"]) & (F.col("Field")==col))).sort("Date",ascending=False).select("NewValue").first()

        if first_row is not None:
            un = first_row[0]
        else:
            un = ""
        tmp[col] = changes.get(col,un)

    fdata.append(tmp)

print(fdata)

Above searches the unknown column value in the previous latest dates NewValue value field.

Next create dataframe out of this.

display(spark.createDataFrame(fdata,schema=liveTableDF.schema))

If you observe here getting partial old values. This is because there no previous record available of that particular column since i am just using sample data. I believe you have more number of record in ChangeTable.

So, i added below records to current one and create table.

(2, "02/08/2023", "User", "Mike", "Jacob"),
    (2, "01/08/2023", "Task", "Data Entry", "Visualization"),
    (1, "01/08/2023", "Task", "Development", "Data Entry"),

After adding i got output for the sample record but not for the newly added records.

So until you have records for the changes done on the particular date you will get output.

Finally, save it as table into your sql database. Here, in the main logic filtering occurs many times on dataframe I recommend to do partition on ID and Date column.

相关问题