kafka streams处理多个流时的流异常

2ul0zpep  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(458)

我正在与kafka streams&kotlin合作开发一个服务,其中包含三个主题的流。第一个具有avro值,另外两个具有字符串值。
在我的 properties 文件,我有 SpecificAvroSerde 作为默认值serde,然后我使用 Consumed.with(Serdes.String(), Serdes.String()) 使用字符串值。

val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

    val topicTwoStream = streamsBuilder
        .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

    val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

当我将下面的流配置为默认值时,我看到avro流(第一个流)运行良好,并使用我在该主题上发布的内容。但是,当我使用相同的配置发布到字符串值流时,会出现一个异常。

default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

以下是发布到topictwo和topicthree的例外情况:

org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

另外,同一个服务中必须有三个流,因为稍后会有一个连接。

yqlxgs2m

yqlxgs2m1#

多亏了一个朋友(marioboikov),当kafka进行分组以产生一个新的 KTable . 它不知道将哪个序列化程序作为分组,因此它将默认的serde作为值,在我的示例中就是这样 SpecificAvroSerde 通过向groupbykey提供分组所需的序列化程序解决了此问题:

val topicTwoStream = streamsBuilder
    .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
    .peek { k, _ -> logger.info("Received message with key: $k") }
    .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }

val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
    .peek { k, _ -> logger.info("Received message with key: $k") }
    .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
    .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }

干杯?

相关问题