我让下面的java代码尝试将我的对象写入s3。
JavaRDD<String> filePaths = objJavaRDD.map( rdd -> {
ArrayList<MyEntity> entityResult = rdd.getObjectResult();
String filePath = "s3a://myBucket/test.parquet";
Path dataFile = new Path(filePath);
Configuration config = new Configuration();
config.set("fs.s3a.access.key", "myAccessKey");
config.set("fs.s3a.secret.key", "mySecretKey");
try (ParquetWriter<MyEntity> writer = AvroParquetWriter.<MyEntity>builder(dataFile)
.withSchema(ReflectData.AllowNull.get().getSchema(MyEntity.class))
.withDataModel(ReflectData.get())
.withConf(config)
.withCompressionCodec(SNAPPY)
.withWriteMode(OVERWRITE)
.build()) {
for (MyEntity d : entityResult) {
writer.write(d);
}
} catch (Exception e) {
System.err.println("Failed to write to the file. \n" + e.getMessage());
}
return filePath;
});
我得到的错误是:
java.lang.IllegalAccessError: tried to access method org.apache.hadoop.metrics2.lib.MutableCounterLong.<init>(Lorg/apache/hadoop/metrics2/MetricsInfo;J)V from class org.apache.hadoop.fs.s3a.S3AInstrumentation
at org.apache.hadoop.fs.s3a.S3AInstrumentation.streamCounter(S3AInstrumentation.java:194)
at org.apache.hadoop.fs.s3a.S3AInstrumentation.streamCounter(S3AInstrumentation.java:216)
at org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:139)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:174)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at akt.org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath(HadoopOutputFile.java:58)
at akt.org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:532)
at com.test.driver.SparkDriverApp.lambda$uploadToS3$8362bc51$1(SparkDriverApp.java:649)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
以下是我的依赖项:
compileOnly('org.apache.spark:spark-core_2.10:' + sparkVersion) {
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
}
compile ('com.amazonaws:aws-java-sdk-s3:1.10.75') {
exclude group: 'org.apache.httpcomponents', module: 'httpclient'
exclude group: 'org.apache.httpcomponents', module: 'httpcore'
}
compile 'com.amazonaws:aws-java-sdk-core:1.10.75'
compile 'com.amazonaws:aws-java-sdk-bom:1.10.75'
compile 'org.apache.httpcomponents:httpclient:4.5'
compile 'org.apache.httpcomponents:httpcore:4.4.3'
compile 'org.apache.parquet:parquet-avro:1.10.1'
compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '2.8.3'
compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.3'
笔记
我们公司还在使用spark 1.6.2,其中还包括hadoop 2.2.0之类的东西。不确定会不会惹麻烦。
另外,我们的emr是非常旧的4.8.2,这使得我们不能使用太新的依赖关系。例如,com。amazonaws:aws-java-sdk-s3:1.10.75.
1条答案
按热度按时间3df52oht1#
这是一个版本兼容的问题。我把所有的版本都降了等级。现在可以了。