pyspark Autoloader -创建tmp视图

dphi5xsq  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(100)

我使用autoloader增量加载数据到银层从青铜表,我使用changeFeed功能。
这就是我如何阅读df

df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("mdp_prd.bronze.nrq_customerassetproperty_autoloader_nodups")

在写流时,我还使用defForEachBatch内部传递这个函数。
函数的一部分看起来像这样。

from pyspark.sql.functions import col
def update_changefeed(df, epochId):
    filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
    filtered_df.createOrReplaceTempView("test2")

运行代码的编写部分失败:

df.writeStream.foreachBatch(update_changefeed) \
 .option("checkpointLocation", checkpoint_directory) \
 .trigger(availableNow=True).start().awaitTermination()

错误告诉我The table or view "test2" cannot be found
我试着测试这个函数,并从流媒体df中创建tmp view,它工作了。我认为问题是当函数传递给ForEachBatch时,它并没有在那里创建它。对此有什么解决办法或解决办法吗?

rjee0c15

rjee0c151#

是的@绿色。就像你说的,我们需要用全球视角。

from pyspark.sql.functions import col

def update_changefeed(df,id):
    ndf1 = df.filter(col("fare_amount")>10)
    ndf2 = df.filter(col("fare_amount")<10)
    ndf1.createGlobalTempView("test1")
    ndf2.createGlobalTempView("test2")
    spark.sql("show  views  from global_temp").show()

输出上述功能。

结果

这是因为在文档中,TEMPORARY视图仅对创建它们的会话可见,并在会话结束时被删除。正如后续解决方案线程中所提到的,这是从原始会话克隆的会话创建的框架。因此,它对该特定会话有效。
你可以在下面看到它打印视图,使用的是框架会话,而不是原始的Spark。
代码:

from pyspark.sql.functions import col

def update_changefeed(df,id):
    ndf1 = df.filter(col("fare_amount")>10)
    ndf1.createOrReplaceTempView("test1")
    temp_views = df.sparkSession.catalog.listTables()
    print(f"Views in this batch:{temp_views}")
    spark.sql("show  views  from  default").show()

相关问题