java—尝试使用avroparquetwriter写入s3时引发错误

mspsb9vt  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(475)

我让下面的java代码尝试将我的对象写入s3。

JavaRDD<String> filePaths = objJavaRDD.map( rdd -> {
            ArrayList<MyEntity> entityResult = rdd.getObjectResult();

                String filePath = "s3a://myBucket/test.parquet";
                Path dataFile = new Path(filePath);

                Configuration config = new Configuration();
                config.set("fs.s3a.access.key", "myAccessKey");
                config.set("fs.s3a.secret.key", "mySecretKey");

                try (ParquetWriter<MyEntity> writer = AvroParquetWriter.<MyEntity>builder(dataFile)
                        .withSchema(ReflectData.AllowNull.get().getSchema(MyEntity.class))
                        .withDataModel(ReflectData.get())
                        .withConf(config)
                        .withCompressionCodec(SNAPPY)
                        .withWriteMode(OVERWRITE)
                        .build()) {
                    for (MyEntity d : entityResult) {
                        writer.write(d);
                    }
                } catch (Exception e) {
                    System.err.println("Failed to write to the file. \n" + e.getMessage());
                }

                return filePath;
        });

我得到的错误是:

java.lang.IllegalAccessError: tried to access method org.apache.hadoop.metrics2.lib.MutableCounterLong.<init>(Lorg/apache/hadoop/metrics2/MetricsInfo;J)V from class org.apache.hadoop.fs.s3a.S3AInstrumentation
    at org.apache.hadoop.fs.s3a.S3AInstrumentation.streamCounter(S3AInstrumentation.java:194)
    at org.apache.hadoop.fs.s3a.S3AInstrumentation.streamCounter(S3AInstrumentation.java:216)
    at org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:139)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at akt.org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath(HadoopOutputFile.java:58)
    at akt.org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:532)
    at com.test.driver.SparkDriverApp.lambda$uploadToS3$8362bc51$1(SparkDriverApp.java:649)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

以下是我的依赖项:

compileOnly('org.apache.spark:spark-core_2.10:' + sparkVersion) {
    exclude group: 'org.apache.hadoop', module: 'hadoop-common'
}
compile ('com.amazonaws:aws-java-sdk-s3:1.10.75') {
    exclude group: 'org.apache.httpcomponents', module: 'httpclient'
    exclude group: 'org.apache.httpcomponents', module: 'httpcore'
}
compile 'com.amazonaws:aws-java-sdk-core:1.10.75'
compile 'com.amazonaws:aws-java-sdk-bom:1.10.75'
compile 'org.apache.httpcomponents:httpclient:4.5'
compile 'org.apache.httpcomponents:httpcore:4.4.3'
compile 'org.apache.parquet:parquet-avro:1.10.1'
compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '2.8.3'
compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.3'

笔记
我们公司还在使用spark 1.6.2,其中还包括hadoop 2.2.0之类的东西。不确定会不会惹麻烦。
另外,我们的emr是非常旧的4.8.2,这使得我们不能使用太新的依赖关系。例如,com。amazonaws:aws-java-sdk-s3:1.10.75.

3df52oht

3df52oht1#

这是一个版本兼容的问题。我把所有的版本都降了等级。现在可以了。

相关问题