如何修复javakafka生产者错误“received invalid metadata error in product request on partition”以及代理关闭时内存不足的问题

oyxsuwqo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(3093)

我用java创建了一个kafka生产者的例子。我一直在发送正常的数据,这只是“测试”+整数作为值Kafka。我使用了下面的属性,在我启动producer客户机并发送消息之后,在此期间,我正在终止代理并突然收到下面的错误消息,而不是重试。
使用3个代理和主题,其中3个分区和复制因子为3,不使用min insync副本
下面是config.put配置的属性(producerconfig.acks\u config,“all”);
config.put(producerconfig.max_in_flight_requests_per_connection,“1”);config.put(commonclientconfigs.retries\u config,60);config.put(producerconfig.enable\u幂等性\u config,true);config.put(producerconfig.retry\u backoff\u ms\u config,10000);config.put(producerconfig.request\u timeout\u ms\u config,30000);config.put(producerconfig.max\u block\u ms\u config,10000);config.put(producerconfig.max\u request\u size\u config,1048576);config.put(producerconfig.batch\u size\u config,16384);config.put(producerconfig.linger\u ms\u config,0);config.put(producerconfig.buffer_memory_config,1073741824);//1gb的
当我杀了我所有的经纪人,或者有时杀了其中一个经纪人,结果如下


**Error:**

WARN org.apache.kafka.clients.producer.internals.Sender  - [Producer 
clientId=producer-1] Got error produce response with correlation id 124 
on topic-partition testing001-0, retrying (59 attempts left). Error: 
NETWORK_EXCEPTION
27791 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender  - [Producer 
clientId=producer-1] Received invalid metadata error in produce request 
on partition testing001-0 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now
28748 [kafka-producer-network-thread | producer-1] ERROR 
org.apache.kafka.common.utils.KafkaThread  - Uncaught exception in thread 
'kafka-producer-network-thread | producer-1':
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(Unknown Source)
at java.nio.ByteBuffer.allocate(Unknown Source)
at    org.apache.kafka.common.memory.MemoryPool$1.tryAllocate 
(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom
(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive
(KafkaChannel.java:335)
at org.apache.kafka.common.network.KafkaChannel.read
(KafkaChannel.java:296)
at org.apache.kafka.common.network.Selector.attemptRead
(Selector.java:560)
at org.apache.kafka.common.network.Selector.pollSelectionKeys
(Selector.java:496)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.producer.internals.Sender.run
(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run
(Sender.java:163)
at java.lang.Thread.run(Unknown Source)
nxagd54h

nxagd54h1#

我猜你在测试制作人。当生产者连接到kafka集群时,您将以逗号分隔的字符串形式传递所有代理ip和端口。你有三个经纪人。当生产者尝试连接到集群时,作为初始化的一部分,集群控制器用集群元数据响应。假设你的制作者只将消息填充到一个主题中。集群在每个主题的代理中保持领先地位。在确定了主题的领导者之后,你的制作人只会和领导者沟通,直到它被直播。
在测试场景中,您故意杀死代理示例。当发生这种情况时,kafka集群需要为您的主题确定一个新的领导者,控制器必须将新的元数据传递给您的生产者。如果元数据更改非常频繁(在您的情况下,您可能会杀死另一个代理,这意味着),生产者可能会收到无效的元数据。

相关问题