flink streaming:我有来自kafkaconsumer的datastream[string],它是json
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
我必须使用streamingfilesink接收这个流,它需要datastream[genericord]
val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()
input.addSink(sink)
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
问题:如何在下沉之前将datastream[string]转换为datastream[genericrecord],以便编写avro文件?
将字符串流转换为通用数据流时出现异常
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:791)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1168)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
at com.att.vdcs.StreamingJobKafkaFlink$.main(StreamingJobKafkaFlink.scala:128)
at com.att.vdcs.StreamingJobKafkaFlink.main(StreamingJobKafkaFlink.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 7 more
在Map器中初始化架构后,获取强制转换异常。
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.util.Map
屏幕中的架构和消息:
通过像这样的施法来克服施法异常:
record.put(0,scala.collection.JavaConverters.mapAsJavaMapConverter(msg._1).asJava)
现在流媒体工作良好,除了有额外的转义字符添加
,"body":"\"{\\\"hdr\\\":{\\\"mes
还有额外的逃生通道
应该是这样的:
,"body":"\"{\"hdr\":{\"mes
将tostring改为getasstring后,额外的escape被删除
现在一切正常。
下一步需要尝试快速压缩流。
2条答案
按热度按时间jxct1oxe1#
您需要将字符串流转换为genericrecords流,例如使用
.map()
功能。例子:
请注意,使用
GenericRecord
可能会导致性能不佳,因为架构需要一次又一次地用每条记录序列化。最好生成一个avropojo,因为它不需要发布模式。nkoocmlb2#
在java中,应该使用richmapfunction将datastream转换为datastream,并添加一个transient schema字段来生成genericord。但我不知道如何在scala中做到这一点,仅供参考。