为什么kafka streams应用程序(spring cloud stream)会忽略custom serde?

ffdz8vbo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(361)

因此,我实现了一个自定义serde,它从 SpecificAvroSerde 由confluent提供,用于在与架构注册表通信超时时尝试重试。我已经配置了spring cloud streams kafka绑定器,将其用作默认值:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=com.test.RetrySpecificAvroSerde

今天我在日志中看到这个错误:

2020-12-14 01:31:53.006 ERROR 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [de7ait1214-x07-baseline-pc-data-s
torage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:

org.apache.kafka.streams.errors.StreamsException: stream-thread [de7ait1214-x07-baseline-pc-data-storage-earning-factors-3bb21ce3-c620-4e6b-8cd2-00059a5c6326-StreamThread-1] task [0_0] Exception caught while punctuating processor 'KSTREAM-TRANSFORM-0000000001'
        at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:449) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {...avro json...}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:365) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:357) ~[kafka-schema-registry-client-5.3.3.ja
r:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:343) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.3.jar:na]
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.3.jar:na]
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.3.jar:na]
        at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.3.3.jar:na]
        at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.3.3.jar:na]
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-2.5.0.jar:na]
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) ~[kafka-streams-2.5.0.jar:na]
...

这告诉我kafka streams使用的serde不是我上面定义的serde,而是基类 SpecificAvroSerde (其中 SpecificAvroSerializer ).
这是否与SpringCloudStreamKafka库试图自动推断要使用的serde的某种方式有关?覆盖和设置serde的“正确”方法是什么?

dhxwm5r4

dhxwm5r41#

我在你的配置中看到: spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde . 这是默认键 Serde . 你是想把它当礼物吗 value.serde . 那么,这就需要改变了。
说到这里,你可以设置 Serde 在单个绑定上也是如此(具有更高的优先级)。
您还可以定义类型为的bean RetrySpecificAvroSerde 在应用程序中,如果kafka streams函数是强类型的(即。 KStream 泛型参数使用了正确的类型)。此方法在活页夹中具有最高优先级。
更正后,如果仍然失败,请与我们分享一个小样本,然后我们可以看看。

相关问题