Kafka连接s3接收器刷新数据-奇怪的延迟

aelbi1ox  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(441)

我已经从ksql查询和inut流创建了一个表,该表由kafka主题支持。
本主题是使用kafka连接到s3。在这个主题中,我有大约1k msgs/秒。
该主题有6个分区和3个副本。
我有一个奇怪的产出比。Flume似乎很奇怪。这是我的监控:监控
您可以看到第一个图表显示输入比b/s,第二个输出比,第三个是使用burrow计算的延迟。
这是我的s3接收器属性文件:

{
"name": "sink-feature-static",
"config": {
  "topics": "FEATURE_APP_STATIC",
  "topics.dir": "users-features-stream",
  "tasks.max": "6",
  "consumer.override.auto.offset.reset": "latest",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
  "partition.duration.ms": "3600000",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
  "flush.size": 1000000,
  "s3.part.size": 5242880,
  "rotate.interval.ms": "600000",
  "rotate.schedule.interval.ms": "600000",
  "locale": "fr-FR",
  "timezone": "UTC",
  "timestamp.extractor": "Record",
  "schema.compatibility": "NONE",
  "aws.secret.access.key": "secretkey",
  "aws.access.key.id": "accesskey",
  "s3.bucket.name": "feature-store-prod-kafka-test",
  "s3.region": "eu-west-1"
  }
}

下面是我在s3 bucket中观察到的:s3 bucket在这些文件中,我在parquet.snappy中有少量的消息(有时只有1有时更多…)。每个分区大约每秒2个文件(当我使用记录时间戳时,这是因为它正赶上滞后时间(我认为)。
我期待的是:
每1000000条消息(flush.size)或每10分钟(rotate.schedule.interval.ms)提交一次文件。
所以我期望(100万条消息>10分钟*1千米/秒):
每小时1/6(每10分钟)*6(分区数量)Parquet文件
2/或者如果我错了,里面至少有100万条信息的文件。。。
但没有观察到1/或2/。。。
我每小时在s3文件中有一个巨大的延迟和一个flush/commit(参见监控)。
“partition.duration.ms”:“3600000”会导致这样的结果吗?
我错在哪里?为什么我没有看到一个连续的输出刷新的数据,但这样的峰值?
谢谢!ré我的

z3yyvxxp

z3yyvxxp1#

是的,第一盘 partition.duration.ms 如果您希望每10分钟有一个s3对象,则为10分钟。第二,如果你真的不想设置小文件 rotate.interval.ms=-1 以及 rotate.schedule.interval.ms 10分钟(但你方不保证一次交货)。
使用时 rotate.interval.ms ,发生的情况是,每次您收到的时间戳早于文件偏移量时,kafka connect会在每个小时的开始和结束时刷新导致非常小的文件,它确实确保在所有故障情况下都能准确地传递一次。

相关问题