通过spark流式传输从ADLS Gen 2阅读文本文件

utugiqy6  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(108)

我正在使用数据块从adls gen2中阅读一个文本文件。这是我的代码。
我可以成功读取,但当我定义查询和写入流时,我收到一个错误:我找不到ADLS gen2令牌。能否为我提供一个解决方案,在文本文件上执行spark流?
我已经尝试通过SAS令牌,即使它不能流文件,而不是它能够流目录。

file_path = "adl://<account-name>.dfs.core.windows.net/<container>/<path>/*.txt"

streaming_df = spark.readStream \
.schema(schema) \
.text(file_path) 

query = streaming_df_transformed.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()
du7egjpx

du7egjpx1#

对于你的第一个问题,你可以使用abfss来代替adl文件系统。
abfss://<container_name>@<storage_acc_name>.dfs.core.windows.net/
您可以使用以下代码配置SAS令牌。

spark.conf.set("fs.azure.account.auth.type.<storage_acc_name>.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.<storage_acc_name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.<storage_acc_name>.dfs.core.windows.net", "Your_SAS_token")

现在对于文本文件,您需要给予readstream的目录,并筛选出只有.txt,您可以给予pathGlobFilter选项,而阅读。
代码:

file_path = f"abfss://[email protected]/databricks/text/"

streaming_df = spark.readStream.schema(schema).option("basePath", file_path)\
    .option("pathGlobFilter", "*.txt").text(file_path + "*")

display(streaming_df.select("*","_metadata.file_path"))

输出量:

相关问题