应用map函数时在同一类上获取kafka streams类强制转换异常

l7mqbcuq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(370)

userrecord.java(由maven avro插件自动生成)

UserRecord extends SpecificRecordBase implement SpecificRecord

userrecordserde.java文件

UserRecordSerde extends SpecificAvroSerde

应用程序.yml

spring.cloud.stream.bindings.input.destination: userTopic
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde: LongSerdespring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: LongSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: SpecificAvroSerde

class-streamlistener-raw流在avro中带有null键和userrecord对象

@StreamListener
        public KStream<Long, ArrayList<UserRecord>> handleUserRecords (@Input KStream<?, UserRecord> userRecordStream) { <br/>
        Map<String, Object> serdeConfig = new HashMap();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); <br/>
        Serde<ArrayList<UserRecord>> userRecordListSerde = new SpecificAvroSerde();
        userRecordListSerde.configure(serdeConfig, false); <br/>
        return userRecordStream
            .map((key, value) -> new KeyValue(value.getUserID, value)
            .groupByKey(Serialized.with(Serdes.Long(), userRecordSerde))
            .aggregate(ArrayList::new, Long key, UserRecord value, ArrayList agg ->
            {
               agg.add(value);
               return agg;
            }, userRecordListSerde)
        .toStream();
    }

例外

java.lang.ClassCastException: com.example.UserRecord cannot be cast to com.example.UserRecord
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
yyyllmsg

yyyllmsg1#

为什么不把它从配置中删除呢? spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde:UserRecordSerde 只需使用 SpecificAvroSerde 直接通过默认配置。 spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde .

相关问题