我们必须从delta表中读取数据,然后根据我们的要求连接所有的表,然后我们必须调用我们的内部APIS来传递每行数据。这是我们的目标。还有另一个要求,我们必须第一次读取批量数据,第二次我们必须从增量表的源读取更改或更新的数据。请帮助我们,如何实现这个senario.
批量加载代码,格式如下。
a = spark.table("tablename1")
b = spark.table("tablename2")
c = spark.table("tablename3")
final_df = spark.sql(" joinig 3 dataframe as per our requirement")
为数据框“final_df”上面的每一行调用APIS
现在,我们不能对源表使用changedata属性。是否可以读取增量数据任何时间戳方式或任何客户实现,请与我们分享。
谢谢
1条答案
按热度按时间anauzrmj1#
您可以使用下面的函数来加载最新的数据,这不会自动加载数据,您需要在出现新数据时运行它。
首先,做批量装载。
这将为您提供具有path、data和max_timestamp的字典。通过从datakey获取值,在连接查询中使用这些返回值。
接下来,将此字典传递给increment_load函数,该函数将为您提供具有最新记录的相同结构字典。
此函数获取file_modification_time大于上一个时间戳的记录。这是最新的记录。
之后,批量加载
tbl1
、tbl2
和tbl3
变量。如果你做增量加载,你会得到空的结果,因为我们没有写任何数据。现在让我们向这三个表中写入一些数据。下面是写的数据。
现在,执行增量加载。
输出量:
在这里,你可以看到数据的最新记录被提取。
然后,您可以在join操作和API调用中使用它。请记住,当调用增量加载函数时,dict表具有以前加载的数据。