在EC2群集上的EMR上使用Pyspark进行阅读delta格式Parquet地板

b1payxdu  于 2023-11-21  发布在  Apache
关注(0)|答案(1)|浏览(107)

我曾经遇到过这样一种情况,我可以在一个常规的EC2示例上运行pyspark,手动pip安装pyspark,然后调用delta-core jar包依赖项作为spark配置,如其他答案所述(例如Unable to run PySpark (Kafka to Delta) in local and getting SparkException: Cannot find catalog plugin class for catalog 'spark_catalog'),但是在EMR集群上运行我的代码时得到spark_catalog错误。有没有人遇到过这个问题,并且知道如何在Amazon上使用Pyspark EMR集群?x1c 0d1x的数据

WORKING:SageMaker中EC2 notebook示例的本地pyspark(图片:DS 3.0,内核:Python3)

# https://docs.delta.io/latest/releases.html
%pip install pyspark==3.2.0

字符串
然后使用delta-core release 1.1.0(以及其他一些使用s3的包/服务器)示例化配置的spark会话

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

pkg_list = [
    "io.delta:delta-core_2.12:1.1.0",
    "org.apache.hadoop:hadoop-aws:3.2.0",
    "com.amazonaws:aws-java-sdk-bundle:1.12.180" 
]
packages = ",".join(pkg_list)

spark = (SparkSession.builder.appName("EDA") 
    .config("spark.jars.packages", packages) 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)

# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") 
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)

print(f"Spark version: {spark.sparkContext.version}")


Spark version: 3.2.0

df = spark.read.format("delta").load("s3a://[my-bucket]/tmp/tmp/lake1/").filter(col('language')=='English')
df.show()


NOT WORKING:EMR集群上的Pyspark(使用SageMaker notebook运行pyspark)

我选择了应该安装Spark 3.2的emr版本(6.6),并使用了完全相同的代码,但得到了spark_catalog错误,这表明软件包版本不兼容。这可能是AMZN的Spark安装与delta-core不兼容的原因吗?因为它不是“常规”Spark 3.2,而是Spark version: 3.2.0-amzn-0


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

pkg_list = [
    "io.delta:delta-core_2.12:1.1.0",
    "org.apache.hadoop:hadoop-aws:3.2.0",
    "com.amazonaws:aws-java-sdk-bundle:1.12.180" 
]
packages = ",".join(pkg_list)

spark = (SparkSession.builder.appName("EDA") 
    .config("spark.jars.packages", packages) 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)

# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") 
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)

print(f"Spark version: {spark.sparkContext.version}")


Spark version: 3.2.0-amzn-0

df = spark.read.format("delta").load("s3a://[my-bucket]/tmp/tmp/lake1/").filter(col('language')=='English')
df.show()


83qze16e

83qze16e1#

最终的问题是,在notebook会话中直接在spark上下文本身上设置spark .config() s实际上并没有设置任何东西。
当我从EMR集群中获取上下文并试图在notebook会话中设置时,我正在设置每个配置,但每次都会抛出“Cannot find catalog plugin.”错误。
一旦我在EMR集群设置according to these instructions期间开始直接设置这些配置,“spark_catalog”错误就消失了,spark上下文成功地获取了配置:

然后,notebook中的调用看起来像这样:

sc_master = sc.master
sc_app = sc.appName

spark = SparkSession.builder.master(sc_master).appName(sc_app).getOrCreate()

# This is mandatory config on spark session to use AWS S3
spark.sparkContext._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") 
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', <key>)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', <secret>)

print(f"Spark version: {spark.sparkContext.version}")

字符串

相关问题