我试图用ApacheIceberg0.9.1将简单的数据写入表中,但是错误消息显示。我想用hadoop直接积垢数据。我创建了一个hadoop表,并尝试从表中读取数据。之后,我尝试将数据写入表中。我准备了一个包含一行的json文件。我的代码已经读取了json对象,并排列了数据的顺序,但是最后一步写入数据总是错误的。我已经更改了依赖项包的一些版本,但是显示了另一个错误消息。软件包的版本有问题吗。请帮帮我。
这是我的源代码:
public class IcebergTest {
public static void main(String[] args) {
testWithoutCatalog();
readDataWithouCatalog();
writeDataWithoutCatalog();
}
public static void testWithoutCatalog() {
Schema bookSchema = new Schema(optional(1, "title", Types.StringType.get()),
optional(2, "price", Types.LongType.get()),
optional(3, "author", Types.StringType.get()),
optional(4, "genre", Types.StringType.get()));
PartitionSpec bookspec = PartitionSpec.builderFor(bookSchema).identity("title").build();
Configuration conf = new Configuration();
String warehousePath = "hdfs://hadoop01:9000/warehouse_path/xgfying/books3";
HadoopTables tables = new HadoopTables(conf);
Table table = tables.create(bookSchema, bookspec, warehousePath);
}
public static void readDataWithouCatalog(){
.......
}
public static void writeDataWithoutCatalog(){
SparkSession spark = SparkSession.builder().master("local[2]").getOrCreate();
Dataset<Row> df = spark.read().json("src/test/data/books3.json");
System.out.println(" this is the writing data : "+df.select("title","price","author","genre")
.first().toString());
df.select("title","price","author","genre")
.write().format("iceberg").mode("append")
.save("hdfs://hadoop01:9000/warehouse_path/xgfying/books3");
// System.out.println(df.write().format("iceberg").mode("append").toString());
}
}
以下是错误消息:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/18 15:51:36 INFO SparkContext: Running Spark version 2.4.5
.......
file:///C:/tmp/icebergtest1/src/test/data/books3.json, range: 0-75, partition values: [empty row]
20/11/18 15:51:52 ERROR Utils: Aborting task
java.lang.ExceptionInInitializerError
at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:232)
at org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:61)
at org.apache.iceberg.spark.source.BaseWriter.openCurrent(BaseWriter.java:105)
at org.apache.iceberg.spark.source.PartitionedWriter.write(PartitionedWriter.java:63)
at org.apache.iceberg.spark.source.Writer$Partitioned24Writer.write(Writer.java:271)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot find constructor for interface org.apache.parquet.column.page.PageWriteStore
Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.<init>(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
at org.apache.iceberg.common.DynConstructors$Builder.build(DynConstructors.java:235)
at org.apache.iceberg.parquet.ParquetWriter.<clinit>(ParquetWriter.java:55)
... 19 more
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)
这是我的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>icebergtest</groupId>
<artifactId>icebergtest1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>icebergtest1</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<iceberg.version>0.9.1</iceberg.version>
<hadoop.version>2.7.0</hadoop.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- org.apache.hadoop BEGIN-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!--将netty包排除-->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--解决io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I异常,-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.18.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- org.apache.hadoop END-->
<!-- org.apache.iceberg BEGIN-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-arrow</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark2</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-pig</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-mr</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- org.apache.iceberg END-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.5</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<!--<version>2.7.9</version>-->
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<!--<version>2.7.9.4</version>-->
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<!--<version>2.7.9</version>-->
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
1条答案
按热度按时间bfrts1fy1#
缺少org.apache.parquet.hadoop.columnchunkpagewritestore(org.apache.parquet.hadoop.codecfactory$bytescompressor,org.apache.parquet.schema.messagetype,org.apache.parquet.bytes.bytebufferallocator,int)[java.lang.nosuchmethodexception:org.apache.parquet.hadoop.columnchunkpagewritestore.(org.apache.parquet.hadoop.codecfactory$bytescompressor,org.apache.parquet.schema.messagetype,org.apache.parquet.bytes.bytebufferallocator,int)]
意味着您正在使用columnchunkpagewritestore的构造函数,它接受4个类型的参数(org.apache.parquet.hadoop.codefactory$bytescompressor、org.apache.parquet.schema.messagetype、org.apache.parquet.bytes.bytebufferallocator、int)
它找不到您正在使用的构造函数。这就是为什么没有这样的错误
根据https://jar-download.com/artifacts/org.apache.parquet/parquet-hadoop/1.8.1/source-code/org/apache/parquet/hadoop/columnchunkpagewritestore.java ,你需要1.8.1的Parquethadoop
将mvn导入更改为旧版本。我查看了1.8.1源代码,它有您需要的正确构造函数。