pyspark 将Apache Spark连接到Azure数据湖(Gen2)

bvhaajcl  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(142)

我在虚拟机中工作,在虚拟机中设置了整个Spark工作区并将其连接到一个MySQL Notebook。**此问题与如何连接Databricks中的Data Lake无关。我只是在虚拟机中工作。**现在我想连接到Azure Data Lake Gen 2以读取我的文件。我安装了以下版本:

  • JDK 11.0.20.1
  • Python 2.7.18
  • Spark 3.5.0

据我所知,这些版本是相互兼容的,所以问题不在于这里。
我的问题是,为什么这不起作用:

from pyspark.sql import SparkSession`
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

# Get Sas Token
key_vault_url = "https://<<keyvault>>.vault.azure.net/"
credential = DefaultAzureCredential() 
client = SecretClient(vault_url=key_vault_url, credential=credential)
sastoken = client.get_secret(<<SAStoken>>)

# Paths to your JAR files
path_to_hadoop_azure_jar = "/opt/spark/jars/hadoop-azure-3.3.4.jar"
path_to_azure_storage_jar = "/opt/spark/jars/azure-storage-8.6.6.jar"
path_to_jetty_util_ajax_jar = "/opt/spark/jars/jetty-util-ajax-11.0.18.jar"
path_to_jetty_util_jar = "/opt/spark/jars/jetty-util-11.0.18.jar"
path_to_azure_datalake_jar = "/opt/spark/jars/hadoop-azure-datalake-3.3.6.jar"

spark = SparkSession.builder.appName("AzureDataRead") \
    .config("spark.driver.extraClassPath", path_to_hadoop_azure_jar) \
    .config("spark.executor.extraClassPath", path_to_hadoop_azure_jar) \
    .config("spark.jars", f"{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_jetty_util_ajax_jar},{path_to_jetty_util_jar},{path_to_azure_datalake_jar}") \
    .config("fs.azure.sas.<<container>>.<<datalake>>.dfs.core.windows.net", sastoken) \
    .getOrCreate()

file_path = "/data/<<file>>" 

# Read example file
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"wasbs://<<container>>.<<datalake>>.dfs.core.windows.net/{file_path}")

# Show the DataFrame
df.show()

字符串
我得到以下错误,据我所知,主要问题在于AzureNativeFileSystemStore类:

Py4JJavaError: An error occurred while calling o158.load.
: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.fs.azure.AzureNativeFileSystemStore
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.createDefaultStore(NativeAzureFileSystem.java:1485)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1410)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)


我已经检查了jar文件的路径和权限,它们应该都是正确的。但显然它仍然不起作用,因为不管我的jar文件的代码抛出这个错误。
有人能帮忙吗?

blmhpbnm

blmhpbnm1#

把你的jar换成下面的jar:

jetty-util-ajax-9.4.48.v20220622.jar
jetty-util-9.4.48.v20220622.jar 
jetty-server-9.4.48.v20220622.jar

字符串
而不是:

jetty-util-ajax-11.0.18.jar
jetty-util-11.0.18.jar


用途:

blob.core.windows.net


对于配置blob存储,请不要:

dfs.core.windows.net


加载时,使用以下路径:

wasbs://<container_name>@<account_name>.blob.core.windows.net/path_to_file/


输出量:


的数据

相关问题