将Dataframe写入ceph存储时出错

x6h2sr28  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(437)

在我的组织中,我目前正在探索如何使用ceph来代替hdfs来运行ai/ml工作负载。作为该计划的一部分,我们建立了一个ceph集群,并使用rook将其导入kubernetes。
在使用ceph进行测试期间,我能够使用s3cmd cli访问ceph存储,还能够使用kubernetes上的spark从ceph读取数据。但是,我在将数据写回ceph存储时遇到了一个错误。
下面是我的代码和错误,我在写回数据时得到的。希望有人能帮上忙。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("prateek-pyspark-ceph") \
        .config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
        .config("spark.kubernetes.namespace", "jupyter") \
        .config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
        .config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
        .config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
        .config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
        .config("spark.hadoop.fs.s3a.access.key", "xxxx") \
        .config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
        .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
        .config("spark.hadoop.fs.s3a.fast.upload","true") \
        .config("spark.eventLog.dir", "s3a://bucket/spark-event-log/") \
        .config("spark.executor.instances", "1") \
        .config("spark.executor.cores", "3") \
        .config("spark.executor.memory", "55g") \
        .config("spark.eventLog.enabled", "false") \
        .getOrCreate()

# Read Source Datasets

musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")

# Register dataframes as temp tables

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

# Top products based on unique user reviews

top_rated = spark.sql("""
select musical_data.asin as product_id, 
        count(distinct musical_data.reviewerID) as unique_reviewer_id_count, 
        musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

# Display top 10 products

top_rated.show(truncate=False)

# Save output as csv

top_rated.write.format("csv") \
        .option("header","true") \
        .mode("overwrite") \
        .save("s3a://bucket/output-data/")

# Stop Spark Context to release resources

spark.stop()

写入Dataframe时出错。

Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object  on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
    at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
    at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
    at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
    at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 44 more

版本-
spark 3.0.1 hadoop 3.2

hlswsv35

hlswsv351#

我先看一下你可以通过谷歌找到的“s3a疑难解答”文档。我不能把一个链接放在这里,因为版主认为链接到规范的hadoops3a文档是错误的。对不起的

fzwojiic

fzwojiic2#

最后,作业在客户机和集群模式下工作
通过spark提交的群集模式

./spark-submit \
--master k8s://https://xxxx:6443 \
--deploy-mode cluster \
--name prateek-ceph-pyspark \
--conf spark.kubernetes.namespace=jupyter \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=55g \
--conf spark.kubernetes.container.image=spark-executor-3.0.1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image.pullSecrets=gcr \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.hadoop.fs.s3a.access.key=<ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<SECRET_KEY> \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.endpoint=http://xxxx:8080 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.eventLog.enabled=false \
s3a://ceph-bucket/scripts/Ceph_PySpark.py

jupyter笔记本中的客户端模式- Spark Config with AWS Credential Provider - org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ```
from pyspark.sql import SparkSession

spark = SparkSession.builder
.appName("prateek-pyspark-ceph")
.config("spark.kubernetes.driver.master", "k8s://https://xxxx:6443")
.config("spark.kubernetes.namespace", "jupyter")
.config("spark.kubernetes.container.image", "spark-executor-3.0.1")
.config("spark.kubernetes.container.image.pullPolicy" ,"Always")
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr")
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
.config("spark.hadoop.fs.s3a.access.key", "xxxxx")
.config("spark.hadoop.fs.s3a.secret.key", "xxxxx")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.endpoint", "{​​​​​​​}​​​​​​​:{​​​​​​​}​​​​​​​".format("http://xxxx", "8080"))
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.executor.instances", "2")
.config("spark.executor.cores", "6")
.config("spark.executor.memory", "55g")
.getOrCreate()
`Spark Config with AWS Credential Provider - com.amazonaws.auth.EnvironmentVariableCredentialsProvider`
from pyspark.sql import SparkSession

spark = SparkSession.builder
.appName("prateek-pyspark-ceph")
.config("spark.kubernetes.driver.master", "k8s://https://xxxx:6443")
.config("spark.kubernetes.namespace", "jupyter")
.config("spark.kubernetes.container.image", "spark-executor-3.0.1")
.config("spark.kubernetes.container.image.pullPolicy" ,"Always")
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr")
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider")
.config("spark.hadoop.fs.s3a.endpoint", "{​​​​​​​}​​​​​​​:{​​​​​​​}​​​​​​​".format("http://xxxx", "8080"))
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.executor.instances", "2")
.config("spark.executor.cores", "6")
.config("spark.executor.memory", "55g")
.getOrCreate()

要在客户端模式下以书面形式向ceph解决此问题,请添加与 `fs.s3a.committer.staging.*` 这里提到-https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html 在你的 `core-site.xml` 驱动程序和执行程序映像中的文件

相关问题