Spark上的Deltalake:从Azure存储阅读表时初始化配置失败

guicsvcw  于 2023-05-18  发布在  Apache
关注(0)|答案(1)|浏览(123)

背景

我正在尝试从本地spark集群读取存储在azure上的增量表。我尝试通过Azure Data Lake Storage Gen 2abfss://)而不是传统的Blob Storage来实现它

Spark shell探索

最终的目标是一个pyspark应用程序,但是为了理解发生了什么,我试图从一个spark shell读取表。下面是我如何启动它:

spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-azure:3.3.1  \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "fs.azure.account.key.<storage_account>.dfs.core.windows.net=<storage_key>"  \

下面是我尝试阅读表格的方法

val dt = spark.read.format("delta").load(f"abfss://hub@<storage_account>.dfs.core.windows.net/fec/fec.delta")

这是我得到的错误

org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException: Failure to initialize configuration
  at org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:51)
  at org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:548)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1449)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:215)
  at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:128)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
  at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
  at org.apache.spark.sql.delta.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:184)
  at org.apache.spark.sql.delta.sources.DeltaDataSource$.parsePathIdentifier(DeltaDataSource.scala:314)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1$lzycompute(DeltaTableV2.scala:70)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.timeTravelByPath$lzycompute(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.timeTravelByPath(DeltaTableV2.scala:65)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$timeTravelSpec$1(DeltaTableV2.scala:99)

我的印象是,我遵循了文档,并且能够使用delta-rs读取Python中的delta表,并且具有非常相同的creds:我相信信用卡。
我可能忘了设置一些东西,但是,我读的文档越多,我就越感到困惑。我还尝试设置一个Oauth2 auth,但最终得到相同的异常。我越想越觉得--conf "fs.azure.account.key.<storage_account>.blob.core.windows.net=<storage_key>"没有被考虑进去(但我不知道为什么)

环境

  • java 8.0.275.hs-adpt
  • scala 2.13.10
  • Spark3.3.2
flvlnr44

flvlnr441#

结果是,必须从spark会话设置身份验证

spark-shell \
--packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-azure:3.3.2  \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \

然后

spark.conf.set("fs.azure.account.key.<storage_account>.dfs.core.windows.net","<storage_key>")
val dt = spark.read.format("delta").load(f"abfss://hub@satestoct.dfs.core.windows.net/fec/fec.delta")

相关问题