classcastexception在spring-kafka测试中使用'merge()`

6fe3ivhb  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(557)

我想用kafka streams test utils的单元测试来测试我的kafka streams拓扑。我使用这个库已经有很长的时间了,我已经用testng在我的测试周围构建了一些抽象层。但自从我加了一个 merge(...) 对于我的流,我得到以下异常:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more

下面是我如何使用topologytestdriver的streambuilder构建流的部分:

// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
    "my-topic-2",
    consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
    (key, value) -> {
        List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
        // Do stuff an fill out the list
        return list;
    })
 .through("tmp-topic");

// Block 2
KStream<MyKey, MyValue>[] branches = stream1
    .merge(stream2)
    ... business stuff

对于生成有关源主题的消息,我使用 TopologyTestDriver.pipeInput(...) 用jsonserdes初始化。这个异常是通过强制转换bytearray发生的,但是我不知道为什么bytearrayserializer的预期参数是同一个类,但是来自另一个模块,而不是加载的消耗类。它们也可能由另一个类装入器装入。但是后台没有spring堆栈,所有东西都应该同步运行。
我真的对这种行为感到困惑。
apachekafka dependecies的版本是:2.0.1,我使用的是openjdk-11。是否可以对齐序列化程序的类加载?只有在my-topic-2上生成一些内容时,合并的另一个主题才能正常工作,错误才会发生。

vjhs03f7

vjhs03f71#

如果没有看到你所有的代码,我不能肯定,但这里是我认为可能发生的事情。
为serdes提供 Consumed 仅在使用输入主题中的记录时提供反序列化;kafka流不会通过拓扑的其余部分传播它们。在任何时候,如果再次需要serde,kafka streams都会使用 StreamsConfig . 这个 Serdes.ByteArraySerde 是默认值。
我建议尝试两件事:
使用 Produced.with(keySerde, valueSerde) 在接收器节点中
通过 StreamsConfig .
嗯,告诉我事情的进展。
-比尔

dldeef67

dldeef672#

正如@bbejeck所提到的,您需要使用不同版本的 .through() ,它允许您覆盖默认值( ByteArraySerde )serdes应用于 K, V .

KStream<K,V> through​(java.lang.String topic,
                     Produced<K,V> produced)

将此流具体化为主题,并使用生成的示例从主题创建一个新的kstream来配置 key serde , value serde ,和streampartitioner。。。这相当于打电话 to(someTopic, Produced.with(keySerde, valueSerde) 和streamsbuilder#stream(sometopicname,consumered.with(keyserde,valueserde))。

相关问题