kafka代理重启后的重新连接

qgzx9mmu  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(279)

我正在使用kafka producer插件进行logback,在重新启动代理之后,所有连接到它的JVM都会出现大量异常:

11:22:48.738 [kafka-producer-network-thread | app-logback-relaxed] cid: clid: E [        @] a: o.a.k.c.p.internals.Sender - [Producer clientId=id-id-logback-relaxed] Uncaught error in kafka producer I/O thread:  ex:java.lang.NullPointerException: null
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436)
at org.apache.kafka.common.network.Selector.poll(Selector.java:399)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:798)

Kafka又起来了没关系,只有重启jvm才有帮助

<appender name="kafkaJoltAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
            <!-- This is the default encoder that encodes every log message to an utf8-encoded string  -->
        <encoder>
                <pattern>%date{"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"} ${HOSTNAME} [%thread] %logger{32} - %message ex:%exf%n</pattern>
        </encoder>
        <topic>mytopichere</topic>
            <!-- we don't care how the log messages will be partitioned  -->
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
            <!-- use async delivery. the application threads are not blocked by logging -->
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
            <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
            <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
            <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=10.99.99.1:9092</producerConfig>
            <!-- don't wait for a broker to ack the reception of a batch.  -->
        <producerConfig>acks=0</producerConfig>
            <!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
        <producerConfig>block.on.buffer.full=false</producerConfig>
            <!-- define a client-id that you use to identify yourself against the kafka broker -->
        <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>
            <!-- use gzip to compress each batch of log messages. valid values: none, gzip, snappy  -->
            <!-- do przetestowania narzut -->
        <producerConfig>compression.type=none</producerConfig>
            <!-- there is no fallback <appender-ref>. If this appender cannot deliver, it will drop its messages. -->
        <producerConfig>max.block.ms=0</producerConfig>
</appender>

是否可以通过某种方式将其设置为自动重新连接?
我在这里的引导服务器中提供loadbalancer地址。背后有三个Kafka经纪人。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题