我使用autoloader增量加载数据到银层从青铜表,我使用changeFeed功能。
这就是我如何阅读df
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("mdp_prd.bronze.nrq_customerassetproperty_autoloader_nodups")
在写流时,我还使用def
在ForEachBatch
内部传递这个函数。
函数的一部分看起来像这样。
from pyspark.sql.functions import col
def update_changefeed(df, epochId):
filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
filtered_df.createOrReplaceTempView("test2")
运行代码的编写部分失败:
df.writeStream.foreachBatch(update_changefeed) \
.option("checkpointLocation", checkpoint_directory) \
.trigger(availableNow=True).start().awaitTermination()
错误告诉我The table or view "test2" cannot be found
。
我试着测试这个函数,并从流媒体df
中创建tmp view
,它工作了。我认为问题是当函数传递给ForEachBatch
时,它并没有在那里创建它。对此有什么解决办法或解决办法吗?
1条答案
按热度按时间rjee0c151#
是的@绿色。就像你说的,我们需要用全球视角。
输出上述功能。
结果
这是因为在文档中,TEMPORARY视图仅对创建它们的会话可见,并在会话结束时被删除。正如后续解决方案线程中所提到的,这是从原始会话克隆的会话创建的框架。因此,它对该特定会话有效。
你可以在下面看到它打印视图,使用的是框架会话,而不是原始的Spark。
代码: