kafka生产者随机停止,不接受修改的属性

piv4azn7  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(138)

我用windows10和kafkav一起工作。0.10.1.0-现在一切都发生在当地,我想说的是,我和Kafka一起管理一个当地的Zookeeper。我的目标是将一些整数转发给单个代理kafka消费者。
这是我当前代码的一个简化的最小示例:

import kafka.javaapi.producer.Producer;
                import kafka.producer.KeyedMessage;
                import kafka.producer.ProducerConfig;

                import java.io.IOException;
                import java.util.Properties;

                public class simple_example {

                    public static void main(String[] args) throws IOException {

                        long startTime = System.currentTimeMillis();

                        Properties properties = new Properties();

                        //properties.put("retries", 0);
                        properties.put("metadata.broker.list", "localhost:9092");
                        properties.put("serializer.class", "kafka.serializer.StringEncoder");
                        properties.put("request.required.acks", "1");

                        Producer<Integer, String> producer = null;

                        int i = 0;

                        // 10e5 is approx the amount of events i want to forward in the real life version of this
                        while (i < 10e5) {

                            try {

                                producer = new Producer<Integer, String>(new ProducerConfig(properties));

                                    String topic = "test2";
                                    String msg = "Event Number: "+i;

                                    KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, msg);
                                    producer.send(data);

                                    System.out.println("-----------------------------------------------------------------------");
                                    long endTime   = System.currentTimeMillis();
                                    long totalTime = endTime - startTime;
                                    System.out.println(totalTime/(1e3)+" [sec] --> ex/s = " + 1/(totalTime/(1e3)));
                                    System.out.println("i= " + i);
                                    System.out.println("-----------------------------------------------------------------------");
                                    i++;
                                    //producer.close();
                                    }  catch (Exception e) {
                                          e.printStackTrace();

                                 } finally {
                                    producer.close();
                                }

                    }
                }

                }

在最初的几千个活动中,它工作得很好。但在8000-10000之间,它会停止并产生以下错误:

15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Verifying properties
                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Property metadata.broker.list is overridden to localhost:9092
                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Property request.required.acks is overridden to 1
                            15:23:52,758 WARN  kafka.utils.VerifiableProperties                              - Property retries is not valid
                            15:23:52,758 INFO  kafka.utils.VerifiableProperties                              - Property serializer.class is overridden to kafka.serializer.StringEncoder
                        15:23:52,758 INFO  kafka.client.ClientUtils$                                     - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test2)
                        15:23:52,758 INFO  kafka.producer.SyncProducer                                   - Connected to localhost:9092 for producing
                        15:23:52,758 INFO  kafka.producer.SyncProducer                                   - Disconnecting from localhost:9092
                        15:23:52,758 WARN  kafka.client.ClientUtils$                                     - Fetching topic metadata with correlation id 0 for topics [Set(test2)] from broker [id:0,host:localhost,port:9092] failed
                        java.nio.channels.ClosedChannelException
                        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
                        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
                        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
                        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
                        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
                        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
                        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
                        at kafka.utils.Utils$.swallow(Utils.scala:172)
                        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
                        at kafka.utils.Utils$.swallowError(Utils.scala:45)
                        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
                        at kafka.producer.Producer.send(Producer.scala:77)
                        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
                        at simple_example.main(simple_example.java:40)
                        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                        at java.lang.reflect.Method.invoke(Method.java:497)
                        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

我怀疑这可能与我送去的房产有关。这就是为什么我试图通过附加的方法来修复它,比如我所遵循的注解https://kafka.apache.org/documentation.html#brokerconfigs
但对于我所做的所有更改(除了代码中的上述三项),我总是得到以下形式的信息:

16:25:39,810 INFO  kafka.utils.VerifiableProperties - Verifying properties
    16:25:39,810 INFO  kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092
    16:25:39,810 INFO  kafka.utils.VerifiableProperties - Property request.required.acks is overridden to 1
    16:25:39,810 WARN  kafka.utils.VerifiableProperties - Property retries is not valid

告诉我我的修改是无效的。
我的问题是
我怎样才能让Kafka在所有的事件中工作?
我应该如何修改Kafka的属性来理解和接受它?
非常感谢您的潜在帮助、推荐、评论
最好的,t。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题