pyspark DataBricks如何基于增量数据读取Delta表

sr4lhrrt  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(145)

我们必须从delta表中读取数据,然后根据我们的要求连接所有的表,然后我们必须调用我们的内部APIS来传递每行数据。这是我们的目标。还有另一个要求,我们必须第一次读取批量数据,第二次我们必须从增量表的源读取更改或更新的数据。请帮助我们,如何实现这个senario.
批量加载代码,格式如下。

a = spark.table("tablename1")
b = spark.table("tablename2")
c = spark.table("tablename3")

final_df = spark.sql(" joinig 3 dataframe as per our requirement")

为数据框“final_df”上面的每一行调用APIS
现在,我们不能对源表使用changedata属性。是否可以读取增量数据任何时间戳方式或任何客户实现,请与我们分享。
谢谢

anauzrmj

anauzrmj1#

您可以使用下面的函数来加载最新的数据,这不会自动加载数据,您需要在出现新数据时运行它。
首先,做批量装载。

from delta.tables import *
from pyspark.sql.functions import col

def bulk_load(table1,table2,table3):
    tbl1Dict = {
        "path":table1,
        "data":spark.table(f"delta.`{table1}`"),
        "max_timestamp":DeltaTable.forPath(spark,path=f"{table1}").history().selectExpr("max(timestamp)").collect()[0][0]
        }
        
     tbl2Dict = {
         "path":table2,
         "data":spark.table(f"delta.`{table2}`"),
         "max_timestamp":DeltaTable.forPath(spark,path=f"{table2}").history().selectExpr("max(timestamp)").collect()[0][0]
         }
         
    tbl3Dict = {
        "path":table3,
        "data":spark.table(f"delta.`{table3}`"),
        "max_timestamp":DeltaTable.forPath(spark,path=f"{table3}").history().selectExpr("max(timestamp)").collect()[0][0]
        }
        
    return tbl1Dict,tbl2Dict,tbl3Dict

这将为您提供具有pathdatamax_timestamp的字典。通过从datakey获取值,在连接查询中使用这些返回值。

tbl1,tbl2,tbl3 = bulk_load("/delta_df_1/","/delta_df_2/","/delta_df_3/")
tbl1["data"].show()
tbl2["data"].show()
tbl3["data"].show()

接下来,将此字典传递给increment_load函数,该函数将为您提供具有最新记录的相同结构字典。

def increment_load(table1,table2,table3):
    tblDict1 = {
        "path":table1["path"],
        "data":spark.table(f'delta.`{table1["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table1["max_timestamp"]),
        "max_timestamp":DeltaTable.forPath(spark,path=f'{table1["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]
        }
        
    tblDict2 = {
        "path":table2["path"],
        "data":spark.table(f'delta.`{table2["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table2["max_timestamp"]),
        "max_timestamp":DeltaTable.forPath(spark,path=f'{table2["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]
        }
        
    tblDict3 = {
        "path":table3["path"],
        "data":spark.table(f'delta.`{table3["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table3["max_timestamp"]),
        "max_timestamp":DeltaTable.forPath(spark,path=f'{table3["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]
        }
        
    return tblDict1,tblDict2,tblDict3

此函数获取file_modification_time大于上一个时间戳的记录。这是最新的记录。
之后,批量加载tbl1tbl2tbl3变量。如果你做增量加载,你会得到空的结果,因为我们没有写任何数据。

现在让我们向这三个表中写入一些数据。下面是写的数据。

现在,执行增量加载。

inc_tbl1,inc_tbl2,inc_tbl3 = increment_load(tbl1,tbl2,tbl3)

输出量:


在这里,你可以看到数据的最新记录被提取。
然后,您可以在join操作和API调用中使用它。请记住,当调用增量加载函数时,dict表具有以前加载的数据。

相关问题