handle滞后于kafkas3连接器

dz6r00yl  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(344)

我们正在使用kafka connect[分布式,confluence 4.0]。
它工作得很好,只是连接器侦听的主题中始终保留未提交的消息。该行为可能与s3连接器配置有关 "flush.size": "20000" . 主题中的滞后总是小于齐平大小。
我们的数据是分批来的,我不想等到下一批来,也不想降低成本 flush.size 创建大量文件。是否可以设置s3连接器刷新数据的超时,即使它没有达到20000个事件?
谢谢!

"config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "event",
    "tasks.max": "3",
    "topics.dir": "connect",
    "s3.region": "some_region",
    "s3.bucket.name": "some_bucket",
    "s3.part.size": "5242880",
    "flush.size": "20000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "FULL",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'\''day_ts'\''=YYYYMMdd/'\''hour_ts'\''=H",
    "partition.duration.ms": "3600000",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "time"
  }
}
rqenqsqc

rqenqsqc1#

要使用s3连接器定期刷新低容量主题的未完成记录,可以使用配置属性: rotate.schedule.interval.ms (此处为配置的完整列表)
请记住,通过使用上面的属性,在重新处理或从错误中恢复时,您可能会看到重复的消息,而不管您使用的是哪个分区器。

相关问题