无法初始化类org.apache.hadoop.fs.s3a.s3afilesystem

4sup72z8  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(386)

我想在Yarn上做一个flink的工作:

$ export HADOOP_CONF_DIR=/etc/hadoop/conf
$ ./${FLINK_HOME}/bin/yarn-session.sh ...
$ ./${FLINK_HOME}/bin/flink run \
    /home/clsadmin/messagehub-to-s3-1.0-SNAPSHOT.jar \
    --kafka-brokers ${KAFKA_BROKERS} \
    ...
     --output-folder s3://${S3_BUCKET}/${S3_FOLDER}

启动输出显示添加到flink的hadoop类路径: Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.6.2.0-205/hadoop/conf:/usr/hdp/2.6.2.0-205/hadoop/lib/*:/usr/hdp/2.6.2.0-205/hadoop/.//*:/usr/hdp/2.6.2.0-205/hadoop-hdfs/./:/usr/hdp/2.6.2.0-205/hadoop-hdfs/lib/*:/usr/hdp/2.6.2.0-205/hadoop-hdfs/.//*:/usr/hdp/2.6.2.0-205/hadoop-yarn/lib/*:/usr/hdp/2.6.2.0-205/hadoop-yarn/.//*:/usr/hdp/2.6.2.0-205/hadoop-mapreduce/lib/*:/usr/hdp/2.6.2.0-205/hadoop-mapreduce/.//*::mysql-connector-java.jar:/home/common/lib/dataconnectorStocator/*:/usr/hdp/2.6.2.0-205/tez/*:/usr/hdp/2.6.2.0-205/tez/lib/*:/usr/hdp/2.6.2.0-205/tez/conf 但是,作业无法启动:

01/29/2018 16:11:04 Job execution switched to status RUNNING.
01/29/2018 16:11:04 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
01/29/2018 16:11:04 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
01/29/2018 16:11:05 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to RUNNING
01/29/2018 16:11:05 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to FAILED
java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.fs.s3a.S3AFileSystem
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:745)

但是,如果我看看flink报告的类路径:

$ grep org.apache.hadoop.fs.s3a.S3AFileSystem /usr/hdp/2.6.2.0-205/hadoop/.//*

我可以看到hadoop-aws.jar在类路径上:

...
Binary file /usr/hdp/2.6.2.0-205/hadoop/.//hadoop-aws-2.7.3.2.6.2.0-205.jar matches
Binary file /usr/hdp/2.6.2.0-205/hadoop/.//hadoop-aws.jar matches
...

进一步调查,我可以看到这个类存在于jar中:

jar -tf /usr/hdp/2.6.2.0-205/hadoop/hadoop-aws.jar | grep org.apache.hadoop.fs.s3a.S3AFileSystem

退货

org/apache/hadoop/fs/s3a/S3AFileSystem$1.class
org/apache/hadoop/fs/s3a/S3AFileSystem$2.class
org/apache/hadoop/fs/s3a/S3AFileSystem$3.class
org/apache/hadoop/fs/s3a/S3AFileSystem$WriteOperationHelper.class
org/apache/hadoop/fs/s3a/S3AFileSystem$4.class
org/apache/hadoop/fs/s3a/S3AFileSystem.class

如果我提交一个示例应用程序:

./flink-1.4.0/bin/flink run \
   ./flink-1.4.0/examples/batch/WordCount.jar \
   --input "s3://${S3_BUCKET}/LICENSE-2.0.txt"  \
   --output "s3://${S3_BUCKET}/license-word-count.txt"

运行起来没有问题。
我的jar文件不包含任何hadoop类:

$ jar -tf /home/clsadmin/messagehub-to-s3-1.0-SNAPSHOT.jar | grep hadoop
<<not found>>

有人知道为什么flink示例运行正常,但是flink在用我的代码加载s3afilesystem时遇到了问题吗?
更新:
我通过以下方法解决了这个问题:

rm -f flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar

这是安全的解决方案吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题