我正在使用ApacheParquetHadoop-parquetrecordwriter和mapreduce和hit ParquetEncodingException: writing empty page
. 尽管我发现,当valuecount为0时,columnwriterbase中会发生这种情况,但我不知道为什么这个属性为0,为什么它与endoding有关,以及这种状态是如何发生的?你知道吗?谢谢你给我小费。
Error: org.apache.parquet.io.ParquetEncodingException: writing empty page
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:309)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
版本:org.apache。parquet:parquet-hadoop:1.11.0
我正在使用我自己的writesupport类:
public class MyDataWriteSupport extends WriteSupport<MyData> {
private RecordConsumer recordConsumer;
public MyDataWriteSupport() {}
@Override
public WriteContext init(Configuration configuration) {
Map<String, String> metaData = new HashMap<>();
return new WriteContext(getSchema(), metaData);
}
public static MessageType getSchema() {
return MessageTypeParser.parseMessageType(
" message MyData { "
+ "optional binary key (UTF8);"
+ "optional int64 length;"
+ "repeated int32 datarray;"
+ "repeated group myobj {\n"
+ " optional int32 id;"
+ " optional binary title (UTF8);"
+ "}"
+ " }");
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
@Override
public void write(MyData record) {
recordConsumer.startMessage();
writeData(record);
recordConsumer.endMessage();
}
private void writeData(MyData record) {
recordConsumer.startMessage();
addStringValue(recordConsumer, 0, "key", record.getKey());
addLongValue(recordConsumer, 1, "length", record.getLength());
addIntegerArrayValues(recordConsumer, 2, "datarray", record.getDataArray());
if (!record.getMyObjects().isEmpty()) {
recordConsumer.startField("myobj", 3);
record
.getMyObject()
.forEach(
obj -> {
recordConsumer.startGroup();
addIntValue(recordConsumer, 0, "id", obj.id);
addStringValue(recordConsumer, 1, "title", obj.title);
recordConsumer.endGroup();
});
recordConsumer.endField("myobj", 3);
}
recordConsumer.endMessage();
}
private void addIntValue(RecordConsumer recordConsumer, int index, String fieldName, int value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addInteger(value);
recordConsumer.endField(fieldName, index);
}
private static void addIntegerArrayValues(
RecordConsumer recordConsumer, int index, String fieldName, int[] is) {
if (is.length > 0) {
recordConsumer.startField(fieldName, index);
Arrays.stream(is).forEach(labelIndex -> recordConsumer.addInteger(labelIndex));
recordConsumer.endField(fieldName, index);
}
}
private static void addLongValue(
RecordConsumer recordConsumer, int index, String fieldName, long value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addLong(value);
recordConsumer.endField(fieldName, index);
}
private static void addStringValue(
RecordConsumer recordConsumer, int index, String fieldName, String value) {
recordConsumer.startField(fieldName, index);
recordConsumer.addBinary(Binary.fromString(value));
recordConsumer.endField(fieldName, index);
}
}
1条答案
按热度按时间qvtsj1bj1#
我想问题出在开始/结束通话上。一个问题是
startMessage()
以及endMessage()
两次调用一次write(MyData)
又一次在writeData(MyData)
. 我建议使用ValidatingRecordConsumer
作为您使用的recordconsumer的 Package 。这样,如果记录序列化出现问题,您可能会得到更有意义的异常。