我使用以下代码将sparkDataframe保存到s3(csv文件)
import traceback
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-aws:2.7.3,
# com.amazonaws:aws-java-sdk-s3:1.11.98 my_file.py
ACCESS_KEY_ID = "xxxxxxxxxx"
SECRET_ACCESS_KEY = "yyyyyyyyyyyyy"
BUCKET_NAME = "zzzz"
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()
try:
spark.conf.set("fs.s3n.awsAccessKeyId", ACCESS_KEY_ID)
spark.conf.set("fs.s3n.awsSecretAccessKey", SECRET_ACCESS_KEY)
spark.conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
output_directory = 's3n://' + BUCKET_NAME + '/' + str("azure_dbs")
df.write.save(output_directory + '_csv', format='csv', header=True, mode="overwrite")
print("Written successful")
except Exception as exp:
print("Exception occurred")
print(exp)
print(traceback.format_exc())
当我从本地系统运行它时,它会成功地写入s3(使用spark submit)。使用的spark submit命令是
spark submit——master local[1]——packages org.apache。hadoop:hadoop-aws:2.7.3,通信。amazonaws:aws-java-sdk-s3:1.11.98我的\文件.py
但是,当我从azuredatabricks笔记本将这些包作为作业的附加依赖项来运行这个作业时,我得到了以下错误。
py4j.protocol.Py4JJavaError: An error occurred while calling o252.save.
: java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V @152: invokevirtual
Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not assignable to 'org/jets3t/service/model/StorageObject'
Current Frame:
bci: @152
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', integer }
Bytecode:
0x0000000: b200 41b9 0067 0100 9900 36b2 0041 bb00
0x0000010: 5959 b700 5a12 68b6 005b 2bb6 005b 1269
0x0000020: b600 5b2c b600 5b12 6ab6 005b 2ab4 0023
0x0000030: b600 3cb6 005b b600 5cb9 006b 0200 2ab4
0x0000040: 0011 9900 302a b400 0c2a b400 232b 0101
0x0000050: 0101 b600 6c4e 2ab4 001c 0994 9e00 162d
0x0000060: b600 6d2a b400 1c94 9e00 0a2a 2d2c b600
0x0000070: 6eb1 bb00 2a59 2cb7 002b 4e2d 2ab4 001f
0x0000080: b600 302a b400 0c2a b400 23b6 003c 2b2a
0x0000090: b400 23b6 003c 2d03 b600 6f57 a700 0a4e
0x00000a0: 2a2d 2bb7 0035 b1
Exception Handler Table:
bci [0, 113] => handler: 159
bci [114, 156] => handler: 159
Stackmap Table:
same_frame(@62)
same_frame(@114)
same_locals_1_stack_item_frame(@159,Object[#216])
same_frame(@166)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:342)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:332)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:103)
at com.databricks.sql.transaction.tahoe.DeltaValidation$.validateNonDeltaWrite(DeltaValidation.scala:94)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:261)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
(在从azuredatabricks运行这个笔记本作业时,我并不像在本地机器场景中那样创建一个新的spark对象,而是使用databricks提供的现有spark。)
错误的原因是什么。当我们从azuredatabricks运行此程序时,是否需要其他程序包?
包括spark提交包:
org.apache。hadoop:hadoop-aws:2.7.3,
通用域名格式。amazonaws:aws-java-sdk-s3:1.11.98
本地计算机:
python 3.6版
spark版本2.4.4使用scala版本2.11.12
数据包详细信息:
群集信息:
5.5 lts(包括apache spark 2.4.3、scala 2.11)
python 3(3.5版)
1条答案
按热度按时间3hvapo4f1#
在azuredatabricks中,似乎我们需要更新用于设置配置的密钥。请参考卡洛斯·大卫·佩给出的答案ñ答。
我们需要使用“spark.hadoop.fs.s3n.impl”键而不是“fs.s3n.impl”。
注意:不需要显式地向作业添加任何依赖库