我有4版本的flink,想把它更新到11。我试着用 StreamingFileSink
而不是不推荐的 BucketingSink
. 我的代码如下:
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](clazz) )
.withBucketCheckInterval(toMillis(config.inactiveBucketThreshold))
.withBucketAssigner(bucketAssigner)
.build()
但是我在测试中对fs有一个问题。
有哪些优点可以利用 StreamingFileSink.forBulkFormat
而不是 StreamingFileSink.forRowFormat
用avro编码器?
你能帮我写一个使用第二种方法的例子吗?
1条答案
按热度按时间qhhrdooz1#
请参考下面的“行格式或大容量格式”线程了解一些详细信息。
简而言之,至少是最新版本
1.11
,flink只提供BulkWriter
avro格式的工厂-AvroWriterFactory
,您可以使用现成的。要使用行格式-
StreamingFileSink.forRowFormat
-您需要提供自己的实现org.apache.flink.api.common.serialization.Encoder
接口,将能够编码和附加数据的部分文件记录。举个例子,你可以看看
org.apache.flink.formats.json.JsonFileSystemFormatFactory.JsonRowDataEncoder
.