使用TimeBasedPartitioner在kafka-connect上为kubernetes配置s3接收器以提高吞吐量

yftpprvb  于 2023-03-01  发布在  Apache
关注(0)|答案(1)|浏览(112)

我在配置了KAFKA_HEAP_OPTS的kubernetes上运行kafka-connect:“-Xmx 1G-Xms 1G”和资源:

requests:
  cpu: 200m
  memory: 2Gi

我需要摄取过去3年中包含约3亿条记录(~ 50 Go)的大量遗留数据。
同时,我继续以大约20 msg/s的速度摄取“实时”数据。
我的每个主题都配置了12个分区。
s3接收器的kafka-connect配置是:

{
  "name": "s3-sink",
  "tasks.max": "2",
  "aws.access.key.id": "<key>",
  "aws.secret.access.key": "<secret>",
  "s3.bucket.name": "bucket",
  "s3.compression.type": "gzip",
  "s3.elastic.buffer.enable": "true",
  "s3.part.size": "5242880",
  "s3.region": "<region>",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
  "partition.duration.ms": "3600000",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "time"
  "locale": "en_US",
  "timezone": "UTC",
  "flush.size": "1000",
  "rotate.interval.ms": "-1",
  "rotate.schedule.interval.ms": "1000",
  "schema.compatibility": "NONE",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "store.url": "<s3-url>",
  "topics.dir": "raw",
  "topics.regex": "raw",
  "value.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
}

有了这个配置,我可以处理大约500 msg/s,但我有不时的OOM。按照这个速度,将需要5天以上的时间来处理所有消息。
此外,我想沉到s3的其他主题在同一时间。
如何改进此连接器配置以避免内存不足错误(java.lang.OutOfMemoryError: Java heap space)并提高吞吐量?
我试着增加任务数量,但我很快就达到了OOM...
由于所需内存太大,因此按比例调整pod数量不会产生结论性结果。

qhhrdooz

qhhrdooz1#

如果有12个分区,则使用"tasks.max": "12"
从Java 8 u292开始,您不应该使用Xmx Xms值。使用设置在80-90%之间的-XX:MaxRAMPercentage,以允许容器使用pod cgroup的全部请求值。
其他主题同时
为它们配置唯一的连接器,但是请记住,所有的连接器都共享同一个JVM,因此您可能只需要运行越来越多的Connect pod副本。
所需的存储器将太大
然后扩展你的k8s集群,这不是Connect能解决的问题。

相关问题