我需要根据avro模式保存结构,将以下内容转换为java中的sparkDataframe。然后我将基于这个avro结构将它写入s3。
GenericRecord r = new GenericData.Record(inAvroSchema);
r.put("id", "1");
r.put("cnt", 111);
Schema enumTest =
SchemaBuilder.enumeration("name1")
.namespace("com.name")
.symbols("s1", "s2");
GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumTest, "s1");
r.put("type", symbol);
ByteArrayOutputStream bao = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> w = new GenericDatumWriter<>(inAvroSchema);
Encoder e = EncoderFactory.get().jsonEncoder(inAvroSchema, bao);
w.write(r, e);
e.flush();
我可以创建基于json结构的对象
Object o = reader.read(null, DecoderFactory.get().jsonDecoder(inAvroSchema, new ByteArrayInputStream(bao.toByteArray())));
但也许有什么方法可以基于bytearrayinputstream(bao.tobytearray())创建Dataframe呢?
谢谢
1条答案
按热度按时间rqqzpn5f1#
不,您必须使用数据源来读取avro数据。对于spark来说,将avro作为文件系统中的文件读取是至关重要的,因为许多优化和特性都依赖于它(比如压缩和分区)。你必须加上
spark-avro
(除非你高于2.4)。请注意EnumType
您正在使用的将是String
在spark的Dataset
另请参见:spark:读取inputstream而不是文件或者,您可以考虑部署一组具有
SparkContext#parallelize
以及通过DatumReader
/DatumWriter
.