使用schema registry时如何设置spring-kafka使用者最大尝试次数

a11xaf1n  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(420)

我正在用SpringKafka(1.3.2.release)、ApacheAvro(1.8.2)和io.confluent的模式注册表(3.1.2)开发SpringBoot服务器。因此,即使kafka侦听器收到kafka消息,它也会在消息中找到schema id,并按id从注册表服务器获取avro架构。问题是,如果scheme registry config服务器关闭,我的侦听器在收到消息时会不断尝试向注册表服务器发送http请求以获取avro模式(同时打印大量错误日志),并且它会阻止所有下一个kafka消息,因为偏移量不会继续移动。

16:56:41.541 ERROR KafkaMessageListenerContainer$ListenerConsumer -  - org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition trade-0 at offset 810845
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 21
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
        at sun.net.www.http.HttpClient.New(HttpClient.java:339)
        at sun.net.www.http.HttpClient.New(HttpClient.java:357)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1546)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:153)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:316)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)

我曾尝试使用retrytemplate设置最大尝试次数,但没有成功,似乎retrytemplate只能在我的listener方法中工作。另外,我在io confluent的网站上也没有找到任何有用的配置。

6qfn3psc

6qfn3psc1#

现在,我使用customavrodeserializer替换kafkaavrodeserializer,它扩展了kafkaavrodeserializer,并通过向其内容添加try-catch覆盖其反序列化方法,如下所示:

@Log4j
public class CustomAvroDeserializer extends KafkaAvroDeserializer {

  @Override
  public Object deserialize(String s, byte[] bytes) {
    try {
      return this.deserialize(bytes);
    } catch (Exception e) {
      log.error("encounter a problem when deserializer message with schema registry:{}", e);
      return null;
    }
  }
}

相关问题