kafka代理由于堆溢出而无法处理请求

8oomwypt  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(336)

我已经更新到1.0.1版本,因为1.0.0有oom问题。
我建立了一个集群,它有四个代理。
大约有150个主题,总共大约有4000个分区,replicationfactor是2。
连接器用于向代理写入数据或从代理读取数据。
connectr版本为0.10.1。
平均消息大小为500b,大约每秒60000条消息。
其中一个代理保留报告oom,无法处理以下请求:

[2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[
    {partition=16,fetch_offset=51198,max_bytes=60728640} ,{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} (kafka.server.KafkaApis)
    java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
    at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
    at scala.Option.map(Option.scala:146)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
    at scala.Option.flatMap(Option.scala:171)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
    at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
    at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
    at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
    at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
    at kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
    at kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
    at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
    at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:99)

然后是大量的收缩isr(这个经纪人是1001)

018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition) 
    018-03-24 13:43:00,286] INFO [Partition s1mme.data-72 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition) 
    018-03-24 13:43:00,286] INFO [Partition gnup.sink.status.storage.topic-17 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition) 
    018-03-24 13:43:00,287] INFO [Partition probessgsniups.sink.offset.storage.topic-4 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition) 
    018-03-24 13:43:01,447] INFO [GroupCoordinator 1001]: Stabilized group connect-VOICE_1_SINK_CONN generation 26 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)

我无法转储堆,因为每次运行时:
[root@sslave1 Kafka]#jcmd 55409 gc.heap_dump/home/ngdb/heap_dump175.hprof
55409:

com.sun.tools.attach.AttachNotSupportedException: Unable to open socket file: target process not responding or HotSpot VM not loaded  
    at sun.tools.attach.LinuxVirtualMachine.(LinuxVirtualMachine.java:106)
    at  sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
    at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
    at sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
    at sun.tools.jcmd.JCmd.main(JCmd.java:131)

jvm参数为:

-XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=2147483648 -XX:InitiatingHeapOccupancyPercent=35  -XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296   -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps   -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers   -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation

当我使用-xx:mx=2g时,四个代理报告了oom,
当我把它增加到4g后,只有一个经纪人报告了oom。
股票价格也上涨了https://issues.apache.org/jira/browse/kafka-6709.

zc0qhyus

zc0qhyus1#

我遇到了同样的问题,我通过重新启动Kafka进程来修复它。

nlejzf6q

nlejzf6q2#

在0.10.x和>=0.11.xKafka版本之间,消息格式发生了变化。
因此,当使用旧客户端(<=0.10)运行最近的代理(>=0.11)时,代理必须在将消息发送回客户端之前向下转换消息。升级说明中记录了这一点:http://kafka.apache.org/documentation/#upgrade_11_message_format.
您可以在堆栈跟踪中看到这确实发生了:

at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)

这会影响性能,而且还会增加所需的内存量,因为代理需要分配新的缓冲区来创建向下转换的消息。
您应该尝试将客户端升级到与代理相同的版本。同时考虑到当前堆有多小(4gb),增加它可能会有所帮助。
另一种选择是强制较新的代理使用较旧的消息格式(使用 log.message.format.version )但这会阻止您使用一些较新的功能。

相关问题