我曾经遇到过这样一种情况,我可以在一个常规的EC2示例上运行pyspark,手动pip安装pyspark,然后调用delta-core jar包依赖项作为spark配置,如其他答案所述(例如Unable to run PySpark (Kafka to Delta) in local and getting SparkException: Cannot find catalog plugin class for catalog 'spark_catalog'),但是在EMR集群上运行我的代码时得到spark_catalog
错误。有没有人遇到过这个问题,并且知道如何在Amazon上使用Pyspark EMR集群?x1c 0d1x的数据
WORKING:SageMaker中EC2 notebook示例的本地pyspark(图片:DS 3.0,内核:Python3)
# https://docs.delta.io/latest/releases.html
%pip install pyspark==3.2.0
字符串
然后使用delta-core release 1.1.0(以及其他一些使用s3的包/服务器)示例化配置的spark会话
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
pkg_list = [
"io.delta:delta-core_2.12:1.1.0",
"org.apache.hadoop:hadoop-aws:3.2.0",
"com.amazonaws:aws-java-sdk-bundle:1.12.180"
]
packages = ",".join(pkg_list)
spark = (SparkSession.builder.appName("EDA")
.config("spark.jars.packages", packages)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)
# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)
print(f"Spark version: {spark.sparkContext.version}")
型Spark version: 3.2.0
个
df = spark.read.format("delta").load("s3a://[my-bucket]/tmp/tmp/lake1/").filter(col('language')=='English')
df.show()
型
的
NOT WORKING:EMR集群上的Pyspark(使用SageMaker notebook运行pyspark)
我选择了应该安装Spark 3.2的emr版本(6.6),并使用了完全相同的代码,但得到了spark_catalog
错误,这表明软件包版本不兼容。这可能是AMZN的Spark安装与delta-core不兼容的原因吗?因为它不是“常规”Spark 3.2
,而是Spark version: 3.2.0-amzn-0
?
的
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
pkg_list = [
"io.delta:delta-core_2.12:1.1.0",
"org.apache.hadoop:hadoop-aws:3.2.0",
"com.amazonaws:aws-java-sdk-bundle:1.12.180"
]
packages = ",".join(pkg_list)
spark = (SparkSession.builder.appName("EDA")
.config("spark.jars.packages", packages)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)
# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)
print(f"Spark version: {spark.sparkContext.version}")
型Spark version: 3.2.0-amzn-0
个
df = spark.read.format("delta").load("s3a://[my-bucket]/tmp/tmp/lake1/").filter(col('language')=='English')
df.show()
型
的
1条答案
按热度按时间83qze16e1#
最终的问题是,在notebook会话中直接在spark上下文本身上设置spark
.config()
s实际上并没有设置任何东西。当我从EMR集群中获取上下文并试图在notebook会话中设置时,我正在设置每个配置,但每次都会抛出“Cannot find catalog plugin.”错误。
一旦我在EMR集群设置according to these instructions期间开始直接设置这些配置,“spark_catalog”错误就消失了,spark上下文成功地获取了配置:
然后,notebook中的调用看起来像这样:
字符串