kafka架构注册表获取错误意外字符(“< ”(代码60)):应为有效值(数字、字符串、数组、对象、“true”、“false”)

ix0qys7i  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(438)

我正在尝试将springboot应用程序与kafka模式注册表集成。我已经创建了一个kafkaproducer,它将在验证到架构注册表后向kafka主题发送消息:

public class Producer {
      @Value("${topic.name}")
      private final String TOPIC;
      private final KafkaTemplate<Integer, Data> kafkaTemplate;
      @Autowired
      public Producer(KafkaTemplate<Integer, Data> kafkaTemplate) {
         this.kafkaTemplate = kafkaTemplate;
      }
     public void sendTestEvent(Data data) throws Exception {
     System.out.println("started");
     Integer key = data.getTestEventId();
      this.kafkaTemplate.send(this.TOPIC,key,data);
}

my application.properties文件

server.port=8084
topic.name=test-topic
server.servlet.context-path=/api/v1
spring.application.name=kafkatest
spring.kafka.bootstrap-servers=*************.com:9093
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol= SASL_SSL
spring.kafka.properties.security.krb5.config = file:/etc/krb5.conf
spring.kafka.properties.sasl.mechanism = GSSAPI
spring.kafka.properties.sasl.kerberos.service.name= kafka
spring.kafka.properties.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required 
useTicketCache=false serviceName="kafka" storeKey=true principal="***************" useKeyTab=true 
keyTab="/home/api/config/kafkaclient.keytab";
spring.kafka.ssl.trust-store-location= file:/home/api/config/truststore.p12
spring.kafka.ssl.trust-store-password=*********************
spring.kafka.ssl.trust-store-type= PKCS12
spring.kafka.basic.auth.credentials.source=USER_INFO 
spring.kafka.basic.auth.user.info=<username>:<password>
spring.kafka.schema.registry.url=https://schema-registry-*****************/subjects/test- 
topic/versions/latest

但我得到了一个错误:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005

> 2020-06-28 22:24:59.047  INFO 2019 --- [nio-8084-exec-1] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2020-06-28 22:24:59.054  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT refresh thread started.
2020-06-28 22:24:59.065  INFO 2019 --- [ha**********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT valid starting at: Sun Jun 28 22:25:07 IST 2020
2020-06-28 22:24:59.067  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT expires: Mon Jun 29 08:25:07 IST 2020
2020-06-28 22:24:59.072  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT refresh sleeping until: Mon Jun 29 06:28:47 IST 2020
2020-06-28 22:24:59.307  WARN 2019 --- [nio-8084-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'security.krb5.config' was supplied but isn't a known config.
2020-06-28 22:24:59.334  INFO 2019 --- [nio-8084-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2020-06-28 22:24:59.340  INFO 2019 --- [nio-8084-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14************
2020-06-28 22:25:03.041  INFO 2019 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: GlUYY***********
2020-06-28 22:25:04.994 ERROR 2019 --- [nio-8084-exec-1] o.a.c.c.C.[.[.[.[dispatcherServlet]      : Servlet.service() for servlet [dispatcherServlet] in context with path [/api/v1] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Event","namespace":"com.*******.kafka.avro.event.sample","fields":[{"name":"event_envelope","type":{"type":"record","name":"EventEnvelope","fields":[{"name":"data","type":{"type":"record","name":"Data","fields":[{"name":"testEventId","type":"int"},{"name":"test","type":{"type":"string","avro.java.string":"String"}}]},"default":{}}]}}]}] with root cause

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.0.jar!/:na]
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.0.jar!/:na]
        at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar!/:na]
        at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar!/:na]
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar!/:na]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar!/:na]
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
        at io.confluent.developer.spring.avro.Producer.sendTestEvent(Producer.java:58) ~[classes!/:0.0.1-SNAPSHOT]
        at io.confluent.developer.spring.avro.KafkaController.sendMessageToKafkaTopic(KafkaController.java:31) ~[classes!/:0.0.1-SNAPSHOT]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:836) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

因为我使用的是https模式注册表,所以我必须在application.properties中设置任何其他属性,比如keystore和trustore证书吗?

ttvkxqim

ttvkxqim1#

<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>
ki1q1bka

ki1q1bka2#

我现在可以在通过模式注册表验证后,以avro格式向kafka发布消息。
我所犯的错误与模式的主题名有关。默认情况下,avro模式使用topicnamestrategy,这意味着它将查找类似于这个topic name值的subject name。如果找不到,它将抛出一个错误,即无法检索模式,因为该模式不具有该主题名称。
我将schema的主题名更改为testtopic value,并能够将消息发布到testtopic上。
我希望这将有助于节省某人的时间。

相关问题