java—在维护逻辑类型的同时将parquet/avro genericrecord写入json

4smxwvx5  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(436)

我正在尝试编写一些包含json逻辑类型的Parquet记录。我是通过 AvroParquetReader ,这给了我一个avro GenericRecord :

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    record.toString();
}
``` `record.toString()` 生产:

{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}

请注意,这是无效的json-日期根据其属性正确转换 `LogicalType` ,但不被引号括起来。
所以我试着 `JsonEncoder` :

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();

try (ParquetReader parquetReader =
AvroParquetReader.builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
DatumWriter writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
writer.write(record, encoder);
encoder.flush();
}

但这根本不会转换日期字段并将数据类型烘焙到每个记录中:

{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}

我想要的结果是:

{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}

我怎样才能正确地写一篇文章 `GenericRecord` 到json?
guicsvcw

guicsvcw1#

正如你所指出的,方法 toString() 在课堂上 GenericRecord 将为您提供一个几乎有效的json表示。
正如您在 GenericData 同学们
GenericData.Record toString 方法只调用
GenericData toString(Object) 方法的实现。
如果您想要记录的一个有效的json表示,您可以使用该代码,并且只需很少的修改,就可以获得所需的信息。
例如,我们可以定义一个实用程序类,如下所示:

package stackoverflow.parquetavro;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Function;

import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

public class GenericRecordJsonEncoder {

  Map<LogicalType, Function<Object, Object>> logicalTypesConverters = new HashMap<>();

  public void registerLogicalTypeConverter(LogicalType logicalType, Function<Object, Object> converter) {
    this.logicalTypesConverters.put(logicalType, converter);
  }

  public Function<Object, Object> getLogicalTypeConverter(Schema.Field field) {
    Schema fieldSchema = field.schema();
    LogicalType logicalType = fieldSchema.getLogicalType();
    return getLogicalTypeConverter(logicalType);
  }

  public Function<Object, Object> getLogicalTypeConverter(LogicalType logicalType) {
    if (logicalType == null) {
      return Function.identity();
    }

    return logicalTypesConverters.getOrDefault(logicalType, Function.identity());
  }

  public String serialize(GenericRecord value) {
    StringBuilder buffer = new StringBuilder();
    serialize(value, buffer, new IdentityHashMap<>(128) );
    String result = buffer.toString();
    return result;
  }

  private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT =
      " \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";

  /**Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
  private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap<Object, Object> seenObjects) {
    if (isRecord(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      buffer.append("{");
      int count = 0;
      Schema schema = getRecordSchema(datum);
      for (Schema.Field f : schema.getFields()) {
        serialize(f.name(), buffer, seenObjects);
        buffer.append(": ");
        Function<Object, Object> logicalTypeConverter = getLogicalTypeConverter(f);
        serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
        if (++count < schema.getFields().size())
          buffer.append(", ");
      }
      buffer.append("}");
      seenObjects.remove(datum);
    } else if (isArray(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      Collection<?> array = getArrayAsCollection(datum);
      buffer.append("[");
      long last = array.size()-1;
      int i = 0;
      for (Object element : array) {
        serialize(element, buffer, seenObjects);
        if (i++ < last)
          buffer.append(", ");
      }
      buffer.append("]");
      seenObjects.remove(datum);
    } else if (isMap(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      buffer.append("{");
      int count = 0;
      @SuppressWarnings(value="unchecked")
      Map<Object,Object> map = (Map<Object,Object>)datum;
      for (Map.Entry<Object,Object> entry : map.entrySet()) {
        serialize(entry.getKey(), buffer, seenObjects);
        buffer.append(": ");
        serialize(entry.getValue(), buffer, seenObjects);
        if (++count < map.size())
          buffer.append(", ");
      }
      buffer.append("}");
      seenObjects.remove(datum);
    } else if (isString(datum)|| isEnum(datum)) {
      buffer.append("\"");
      writeEscapedString(datum.toString(), buffer);
      buffer.append("\"");
    } else if (isBytes(datum)) {
      buffer.append("{\"bytes\": \"");
      ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
      writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
      buffer.append("\"}");
    } else if (((datum instanceof Float) &&       // quote Nan & Infinity
        (((Float)datum).isInfinite() || ((Float)datum).isNaN()))
        || ((datum instanceof Double) &&
        (((Double)datum).isInfinite() || ((Double)datum).isNaN()))) {
      buffer.append("\"");
      buffer.append(datum);
      buffer.append("\"");
    } else if (datum instanceof GenericData) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      serialize(datum, buffer, seenObjects);
      seenObjects.remove(datum);
    } else {
      // This fallback is the reason why GenericRecord toString does not
      // generate a valid JSON representation
      buffer.append(datum);
    }
  }

  // All these methods are also copied from the GenericData class source

  private boolean isRecord(Object datum) {
    return datum instanceof IndexedRecord;
  }

  private Schema getRecordSchema(Object record) {
    return ((GenericContainer)record).getSchema();
  }

  private Object getField(Object record, String name, int position) {
    return ((IndexedRecord)record).get(position);
  }

  private boolean isArray(Object datum) {
    return datum instanceof Collection;
  }

  private Collection getArrayAsCollection(Object datum) {
    return (Collection)datum;
  }

  private boolean isEnum(Object datum) {
    return datum instanceof GenericEnumSymbol;
  }

  private boolean isMap(Object datum) {
    return datum instanceof Map;
  }

  private boolean isString(Object datum) {
    return datum instanceof CharSequence;
  }

  private boolean isBytes(Object datum) {
    return datum instanceof ByteBuffer;
  }

  private void writeEscapedString(CharSequence string, StringBuilder builder) {
    for(int i = 0; i < string.length(); i++){
      char ch = string.charAt(i);
      switch(ch){
        case '"':
          builder.append("\\\"");
          break;
        case '\\':
          builder.append("\\\\");
          break;
        case '\b':
          builder.append("\\b");
          break;
        case '\f':
          builder.append("\\f");
          break;
        case '\n':
          builder.append("\\n");
          break;
        case '\r':
          builder.append("\\r");
          break;
        case '\t':
          builder.append("\\t");
          break;
        default:
          // Reference: http://www.unicode.org/versions/Unicode5.1.0/
          if((ch>='\u0000' && ch<='\u001F') || (ch>='\u007F' && ch<='\u009F') || (ch>='\u2000' && ch<='\u20FF')){
            String hex = Integer.toHexString(ch);
            builder.append("\\u");
            for(int j = 0; j < 4 - hex.length(); j++)
              builder.append('0');
            builder.append(hex.toUpperCase());
          } else {
            builder.append(ch);
          }
      }
    }
  }
}

在这个类中,您可以为所需的逻辑类型注册转换器。考虑以下示例:

GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// Register as many logical types converters as you need
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
  final Instant instant = (Instant)o;
  final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
  return result;
});

String json = encoder.serialize(genericRecord);
System.out.println(json);

这将为您提供所需的结果。

相关问题