pyspark 更改群集配置时出现数据块错误,提示缺少文件

yrdbyhpb  于 2023-05-16  发布在  Spark
关注(0)|答案(1)|浏览(124)

我得到下面的脚本错误
“阅读文件/mnt/topics/audit-logs/minio.sys.tmp/multipart/v1/c782. 025a337657/azure.json时出错。已收到文件的文件通知:/mnt/topics/audit-logs/minio.sys.tmp/multipart/v1/c782.025a337657/azure.json,但它已不存在。请确保文件在处理之前未被删除。要继续您的流,您可以将Spark SQL配置spark.sql.files.ignoreMissingFiles设置为true。

table_name= "main.deltalake_db.Table_test"
checkpoint_path = "/mnt/_checkpoint"
file_path ="/mnt/topics/audit-logs"
schema = "authenticationMechanism STRING,authenticationMechanismId STRING"
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .schema(schema)  
  .load(file_path)
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

这是工作正常之前,但我改变了集群配置,并通过提供keyvault范围挂载的位置。现在一切都破碎了。我试图移动到原始配置,但同样的错误。有人能帮忙吗。我应该把这个配置移到脚本中的什么地方?

mkshixfv

mkshixfv1#

你得到的错误是缺少文件。为了避免此错误并继续在Spark SQL配置代码下面运行流。

%sql

SET spark.sql.files.ignoreMissingFiles = true;

或者在readstream之前添加spark.conf.set,如下所示。

table_name= "main.deltalake_db.Table_test"
checkpoint_path = "/mnt/_checkpoint"
file_path ="/mnt/topics/audit-logs"
schema = "authenticationMechanism STRING,authenticationMechanismId STRING"

spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation",
  checkpoint_path)
   .schema(schema)  
   .load(file_path)
   .writeStream
   .option("checkpointLocation", checkpoint_path)
   .trigger(availableNow=True)
   .toTable(table_name))

并尝试运行您的代码。

相关问题