pyspark中的用例流

06odsfpq  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(88)

我在Azure上使用Databricks,我的数据托管在ADLS 2上。
当前运行时版本为10.4 LTS(如果需要,我可以升级)
我有一个表Pimproduct
| ID|名称|行动|dlk_last_modified| pim_timestamp|
| --|--|--|--|--|
| 1 |第一条|一|2022-03-01 2022-03-01| 2022-02-28 22:34:00|

  • id:文章的ID(unic)
  • name:文章名称
  • Action:要在该行上执行的操作(A =添加,D =删除)
  • dlk_last_modified:表中插入日期
  • pim_timestamp:从源系统提取数据

每隔15分钟,我会收到一个新文件,其中包含我需要插入的修改。对于文件中的每一行,我只考虑每个ID的最新pim_timestamp
1.如果这一行是action=A,而ID不存在,我就添加这一行
1.如果行是action=A并且ID存在,我将用新行替换相同ID的现有行。
1.如果该行是action=D,则需要从表中删除ID。
最初,修改是每天进行的。我使用了这个代码:

from pyspark.sql import functions as F, Window as W

df = spark.table("Pimproduct").unionByNames(
    spark.read.format("avro").load("/path/to/daily/data")
)

df = df.withColumn(
    "rn",
    F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())),
)

df = df.where("rn = 1").where("action <> 'D'")

df.write.saveAsTable("pimproduct", format="delta", mode="overwrite")

但是现在,我想在流媒体中做同样的事情,我真的不知道我怎么能做到这一点。我试过这个:

import tempfile
from pyspark.sql import functions as F, Window as W

df = spark.readSteam.table("Pimproduct").unionByNames(
    spark.readStream.schema(schema).format("avro").load("/path/to/daily/data")
)

df = df.withColumn(
    "rn",
    F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())),
)

df = df.where("rn = 1").where("action <> 'D'")

with tempfile.TemporaryDirectory() as d:
    df.writeStream.toTable("Pimproduct", checkpointLocation=d)

但我得到了错误:
分析异常:流式 Dataframe /数据集不支持非基于时间的窗口;
你知道我该如何进行数据流摄取吗?我愿意接受建议。

9lowa7mx

9lowa7mx1#

您需要使用foreachbatch来更新带有流数据的表。
您可以使用下面的功能来更新您的表格。

from pyspark.sql import functions as F, Window as W
from delta.tables import *


def update_table2(df,id):
    source = DeltaTable.forName(spark,"Pimproduct")
    source.alias("src")\
    .merge(df.alias("upds"),"src.id == upds.id and upds.pim_timestamp > src.pim_timestamp")\
    .whenMatchedUpdateAll().execute()

    source = DeltaTable.forName(spark,"Pimproduct")
    source.alias("src").delete(condition="src.action=='D'")

然后像下面这样调用函数。
df.writeStream.foreachBatch(update_table2).start()
输出量:

相关问题