因此,我实现了一个自定义serde,它从 SpecificAvroSerde
由confluent提供,用于在与架构注册表通信超时时尝试重试。我已经配置了spring cloud streams kafka绑定器,将其用作默认值:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=com.test.RetrySpecificAvroSerde
今天我在日志中看到这个错误:
2020-12-14 01:31:53.006 ERROR 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [de7ait1214-x07-baseline-pc-data-s
torage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [de7ait1214-x07-baseline-pc-data-storage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] task [0_0] Exception caught while punctuating processor 'KSTREAM-TRANSFORM-0000000001'
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:449) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {...avro json...}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:365) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:357) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:343) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.3.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.3.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.3.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.3.3.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.3.3.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-2.5.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) ~[kafka-streams-2.5.0.jar:na]
...
这告诉我kafka streams使用的serde不是我上面定义的serde,而是基类 SpecificAvroSerde
(其中 SpecificAvroSerializer
).
这是否与SpringCloudStreamKafka库试图自动推断要使用的serde的某种方式有关?覆盖和设置serde的“正确”方法是什么?
1条答案
按热度按时间dhxwm5r41#
我在你的配置中看到:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
. 这是默认键Serde
. 你是想把它当礼物吗value.serde
. 那么,这就需要改变了。说到这里,你可以设置
Serde
在单个绑定上也是如此(具有更高的优先级)。您还可以定义类型为的bean
RetrySpecificAvroSerde
在应用程序中,如果kafka streams函数是强类型的(即。KStream
泛型参数使用了正确的类型)。此方法在活页夹中具有最高优先级。更正后,如果仍然失败,请与我们分享一个小样本,然后我们可以看看。