我在配置了KAFKA_HEAP_OPTS
的kubernetes上运行kafka-connect:“-Xmx 1G-Xms 1G”和资源:
requests:
cpu: 200m
memory: 2Gi
我需要摄取过去3年中包含约3亿条记录(~ 50 Go)的大量遗留数据。
同时,我继续以大约20 msg/s的速度摄取“实时”数据。
我的每个主题都配置了12个分区。
s3接收器的kafka-connect配置是:
{
"name": "s3-sink",
"tasks.max": "2",
"aws.access.key.id": "<key>",
"aws.secret.access.key": "<secret>",
"s3.bucket.name": "bucket",
"s3.compression.type": "gzip",
"s3.elastic.buffer.enable": "true",
"s3.part.size": "5242880",
"s3.region": "<region>",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "time"
"locale": "en_US",
"timezone": "UTC",
"flush.size": "1000",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "1000",
"schema.compatibility": "NONE",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"store.url": "<s3-url>",
"topics.dir": "raw",
"topics.regex": "raw",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
}
有了这个配置,我可以处理大约500 msg/s,但我有不时的OOM。按照这个速度,将需要5天以上的时间来处理所有消息。
此外,我想沉到s3的其他主题在同一时间。
如何改进此连接器配置以避免内存不足错误(java.lang.OutOfMemoryError: Java heap space
)并提高吞吐量?
我试着增加任务数量,但我很快就达到了OOM...
由于所需内存太大,因此按比例调整pod数量不会产生结论性结果。
1条答案
按热度按时间qhhrdooz1#
如果有12个分区,则使用
"tasks.max": "12"
从Java 8 u292开始,您不应该使用
Xmx Xms
值。使用设置在80-90%之间的-XX:MaxRAMPercentage
,以允许容器使用pod cgroup的全部请求值。其他主题同时
为它们配置唯一的连接器,但是请记住,所有的连接器都共享同一个JVM,因此您可能只需要运行越来越多的Connect pod副本。
所需的存储器将太大
然后扩展你的k8s集群,这不是Connect能解决的问题。