在我的组织中,我目前正在探索如何使用ceph来代替hdfs来运行ai/ml工作负载。作为该计划的一部分,我们建立了一个ceph集群,并使用rook将其导入kubernetes。
在使用ceph进行测试期间,我能够使用s3cmd cli访问ceph存储,还能够使用kubernetes上的spark从ceph读取数据。但是,我在将数据写回ceph存储时遇到了一个错误。
下面是我的代码和错误,我在写回数据时得到的。希望有人能帮上忙。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.eventLog.dir", "s3a://bucket/spark-event-log/") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()
# Read Source Datasets
musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")
# Register dataframes as temp tables
musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")
# Top products based on unique user reviews
top_rated = spark.sql("""
select musical_data.asin as product_id,
count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")
# Display top 10 products
top_rated.show(truncate=False)
# Save output as csv
top_rated.write.format("csv") \
.option("header","true") \
.mode("overwrite") \
.save("s3a://bucket/output-data/")
# Stop Spark Context to release resources
spark.stop()
写入Dataframe时出错。
Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 44 more
版本-
spark 3.0.1 hadoop 3.2
2条答案
按热度按时间hlswsv351#
我先看一下你可以通过谷歌找到的“s3a疑难解答”文档。我不能把一个链接放在这里,因为版主认为链接到规范的hadoops3a文档是错误的。对不起的
fzwojiic2#
最后,作业在客户机和集群模式下工作
通过spark提交的群集模式
jupyter笔记本中的客户端模式-
Spark Config with AWS Credential Provider - org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
```from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("prateek-pyspark-ceph")
.config("spark.kubernetes.driver.master", "k8s://https://xxxx:6443")
.config("spark.kubernetes.namespace", "jupyter")
.config("spark.kubernetes.container.image", "spark-executor-3.0.1")
.config("spark.kubernetes.container.image.pullPolicy" ,"Always")
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr")
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
.config("spark.hadoop.fs.s3a.access.key", "xxxxx")
.config("spark.hadoop.fs.s3a.secret.key", "xxxxx")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080"))
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.executor.instances", "2")
.config("spark.executor.cores", "6")
.config("spark.executor.memory", "55g")
.getOrCreate()
`Spark Config with AWS Credential Provider - com.amazonaws.auth.EnvironmentVariableCredentialsProvider`
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("prateek-pyspark-ceph")
.config("spark.kubernetes.driver.master", "k8s://https://xxxx:6443")
.config("spark.kubernetes.namespace", "jupyter")
.config("spark.kubernetes.container.image", "spark-executor-3.0.1")
.config("spark.kubernetes.container.image.pullPolicy" ,"Always")
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr")
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider")
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080"))
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.executor.instances", "2")
.config("spark.executor.cores", "6")
.config("spark.executor.memory", "55g")
.getOrCreate()