streamingfilesink-forbulkformat与forrowformat-avro编码器

pxq42qpu  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(313)

我有4版本的flink,想把它更新到11。我试着用 StreamingFileSink 而不是不推荐的 BucketingSink . 我的代码如下:

val sink = StreamingFileSink
  .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](clazz) )
  .withBucketCheckInterval(toMillis(config.inactiveBucketThreshold))
  .withBucketAssigner(bucketAssigner)
  .build()

但是我在测试中对fs有一个问题。
有哪些优点可以利用 StreamingFileSink.forBulkFormat 而不是 StreamingFileSink.forRowFormat 用avro编码器?
你能帮我写一个使用第二种方法的例子吗?

qhhrdooz

qhhrdooz1#

请参考下面的“行格式或大容量格式”线程了解一些详细信息。
简而言之,至少是最新版本 1.11 ,flink只提供 BulkWriter avro格式的工厂- AvroWriterFactory ,您可以使用现成的。
要使用行格式- StreamingFileSink.forRowFormat -您需要提供自己的实现 org.apache.flink.api.common.serialization.Encoder 接口,将能够编码和附加数据的部分文件记录。
举个例子,你可以看看 org.apache.flink.formats.json.JsonFileSystemFormatFactory.JsonRowDataEncoder .

相关问题