将sparkDataframe从azure databricks写入s3会导致java.lang.verifyerror:操作数堆栈上的错误类型

oxf4rvwz  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(437)

我使用以下代码将sparkDataframe保存到s3(csv文件)

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used

# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-aws:2.7.3,

# com.amazonaws:aws-java-sdk-s3:1.11.98 my_file.py

ACCESS_KEY_ID = "xxxxxxxxxx"
SECRET_ACCESS_KEY = "yyyyyyyyyyyyy"
BUCKET_NAME = "zzzz"

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark.conf.set("fs.s3n.awsAccessKeyId", ACCESS_KEY_ID)
    spark.conf.set("fs.s3n.awsSecretAccessKey", SECRET_ACCESS_KEY)
    spark.conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    output_directory = 's3n://' + BUCKET_NAME + '/' + str("azure_dbs")
    df.write.save(output_directory + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(exp)
    print(traceback.format_exc())

当我从本地系统运行它时,它会成功地写入s3(使用spark submit)。使用的spark submit命令是
spark submit——master local[1]——packages org.apache。hadoop:hadoop-aws:2.7.3,通信。amazonaws:aws-java-sdk-s3:1.11.98我的\文件.py
但是,当我从azuredatabricks笔记本将这些包作为作业的附加依赖项来运行这个作业时,我得到了以下错误。

py4j.protocol.Py4JJavaError: An error occurred while calling o252.save.
    : java.lang.VerifyError: Bad type on operand stack
    Exception Details:
      Location:
        org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V @152: invokevirtual
      Reason:
        Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not assignable to 'org/jets3t/service/model/StorageObject'
      Current Frame:
        bci: @152
        flags: { }
        locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
        stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', integer }
      Bytecode:
        0x0000000: b200 41b9 0067 0100 9900 36b2 0041 bb00
        0x0000010: 5959 b700 5a12 68b6 005b 2bb6 005b 1269
        0x0000020: b600 5b2c b600 5b12 6ab6 005b 2ab4 0023
        0x0000030: b600 3cb6 005b b600 5cb9 006b 0200 2ab4
        0x0000040: 0011 9900 302a b400 0c2a b400 232b 0101
        0x0000050: 0101 b600 6c4e 2ab4 001c 0994 9e00 162d
        0x0000060: b600 6d2a b400 1c94 9e00 0a2a 2d2c b600
        0x0000070: 6eb1 bb00 2a59 2cb7 002b 4e2d 2ab4 001f
        0x0000080: b600 302a b400 0c2a b400 23b6 003c 2b2a
        0x0000090: b400 23b6 003c 2d03 b600 6f57 a700 0a4e
        0x00000a0: 2a2d 2bb7 0035 b1                      
      Exception Handler Table:
        bci [0, 113] => handler: 159
        bci [114, 156] => handler: 159
      Stackmap Table:
        same_frame(@62)
        same_frame(@114)
        same_locals_1_stack_item_frame(@159,Object[#216])
        same_frame(@166)

        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:342)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:332)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:103)
        at com.databricks.sql.transaction.tahoe.DeltaValidation$.validateNonDeltaWrite(DeltaValidation.scala:94)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:261)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)

(在从azuredatabricks运行这个笔记本作业时,我并不像在本地机器场景中那样创建一个新的spark对象,而是使用databricks提供的现有spark。)
错误的原因是什么。当我们从azuredatabricks运行此程序时,是否需要其他程序包?
包括spark提交包:
org.apache。hadoop:hadoop-aws:2.7.3,
通用域名格式。amazonaws:aws-java-sdk-s3:1.11.98
本地计算机:
python 3.6版
spark版本2.4.4使用scala版本2.11.12
数据包详细信息:
群集信息:
5.5 lts(包括apache spark 2.4.3、scala 2.11)
python 3(3.5版)

3hvapo4f

3hvapo4f1#

在azuredatabricks中,似乎我们需要更新用于设置配置的密钥。请参考卡洛斯·大卫·佩给出的答案ñ答。
我们需要使用“spark.hadoop.fs.s3n.impl”键而不是“fs.s3n.impl”。
注意:不需要显式地向作业添加任何依赖库

相关问题