flink datastream[string]kafkaconsumer转换为sink的avro

q8l4jmvw  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(459)

flink streaming:我有来自kafkaconsumer的datastream[string],它是json

stream = env
    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
我必须使用streamingfilesink接收这个流,它需要datastream[genericord]

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build()
input.addSink(sink)

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
问题:如何在下沉之前将datastream[string]转换为datastream[genericrecord],以便编写avro文件?
将字符串流转换为通用数据流时出现异常

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
    at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:791)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1168)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
    at com.att.vdcs.StreamingJobKafkaFlink$.main(StreamingJobKafkaFlink.scala:128)
    at com.att.vdcs.StreamingJobKafkaFlink.main(StreamingJobKafkaFlink.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
    ... 7 more

在Map器中初始化架构后,获取强制转换异常。

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.util.Map

屏幕中的架构和消息:

通过像这样的施法来克服施法异常:

record.put(0,scala.collection.JavaConverters.mapAsJavaMapConverter(msg._1).asJava)

现在流媒体工作良好,除了有额外的转义字符添加

,"body":"\"{\\\"hdr\\\":{\\\"mes

还有额外的逃生通道
应该是这样的:

,"body":"\"{\"hdr\":{\"mes

将tostring改为getasstring后,额外的escape被删除
现在一切正常。
下一步需要尝试快速压缩流。

jxct1oxe

jxct1oxe1#

您需要将字符串流转换为genericrecords流,例如使用 .map() 功能。
例子:

DataStream<String> strings = env.addSource( ... );
DataStream<GenericRecord> records = strings.map(inputStr -> {
    GenericData.Record rec = new GenericData.Record(schema);
    rec.put(0, inputStr);
    return rec;
});

请注意,使用 GenericRecord 可能会导致性能不佳,因为架构需要一次又一次地用每条记录序列化。最好生成一个avropojo,因为它不需要发布模式。

nkoocmlb

nkoocmlb2#

在java中,应该使用richmapfunction将datastream转换为datastream,并添加一个transient schema字段来生成genericord。但我不知道如何在scala中做到这一点,仅供参考。

DataStream<GenericRecord> records = maps.map(new RichMapFunction<Map<String, Object>, GenericRecord>() {
        private transient DatumWriter<IndexedRecord> datumWriter;

        /**
         * Output stream to serialize records into byte array.
         */
        private transient ByteArrayOutputStream arrayOutputStream;

        /**
         * Low-level class for serialization of Avro values.
         */
        private transient Encoder encoder;
        /**
         * Avro serialization schema.
         */
        private transient Schema schema;
        @Override
        public GenericRecord map(Map<String, Object> stringObjectMap) throws Exception {
            GenericRecord record =  new GenericData.Record(schema);
            stringObjectMap.entrySet().forEach(entry->{record.put(entry.getKey(), entry.getValue());});
            return record;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.arrayOutputStream = new ByteArrayOutputStream();
            this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
            this.datumWriter = new GenericDatumWriter<>(schema);
            try {
                this.schema = new Schema.Parser().parse(avroSchemaString);
            } catch (SchemaParseException e) {
                throw new IllegalArgumentException("Could not parse Avro schema string.", e);
            }
        }
    });

    final StreamingFileSink<GenericRecord> sink = StreamingFileSink
            .forBulkFormat(new Path("D:\\test"), AvroWriters.forGenericRecord(mongoSchema))
            .build();
    records.addSink(sink);

相关问题