我在Azure上使用Databricks,我的数据托管在ADLS 2上。
当前运行时版本为10.4 LTS(如果需要,我可以升级)
我有一个表Pimproduct:
| ID|名称|行动|dlk_last_modified| pim_timestamp|
| --|--|--|--|--|
| 1 |第一条|一|2022-03-01 2022-03-01| 2022-02-28 22:34:00|
- id:文章的ID(unic)
- name:文章名称
- Action:要在该行上执行的操作(A =添加,D =删除)
- dlk_last_modified:表中插入日期
- pim_timestamp:从源系统提取数据
每隔15分钟,我会收到一个新文件,其中包含我需要插入的修改。对于文件中的每一行,我只考虑每个ID的最新pim_timestamp
:
1.如果这一行是action=A,而ID不存在,我就添加这一行
1.如果行是action=A并且ID存在,我将用新行替换相同ID的现有行。
1.如果该行是action=D,则需要从表中删除ID。
最初,修改是每天进行的。我使用了这个代码:
from pyspark.sql import functions as F, Window as W
df = spark.table("Pimproduct").unionByNames(
spark.read.format("avro").load("/path/to/daily/data")
)
df = df.withColumn(
"rn",
F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())),
)
df = df.where("rn = 1").where("action <> 'D'")
df.write.saveAsTable("pimproduct", format="delta", mode="overwrite")
但是现在,我想在流媒体中做同样的事情,我真的不知道我怎么能做到这一点。我试过这个:
import tempfile
from pyspark.sql import functions as F, Window as W
df = spark.readSteam.table("Pimproduct").unionByNames(
spark.readStream.schema(schema).format("avro").load("/path/to/daily/data")
)
df = df.withColumn(
"rn",
F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())),
)
df = df.where("rn = 1").where("action <> 'D'")
with tempfile.TemporaryDirectory() as d:
df.writeStream.toTable("Pimproduct", checkpointLocation=d)
但我得到了错误:
分析异常:流式 Dataframe /数据集不支持非基于时间的窗口;
你知道我该如何进行数据流摄取吗?我愿意接受建议。
1条答案
按热度按时间9lowa7mx1#
您需要使用
foreachbatch
来更新带有流数据的表。您可以使用下面的功能来更新您的表格。
然后像下面这样调用函数。
df.writeStream.foreachBatch(update_table2).start()
输出量: