java 如何使用TestTopologyDriver测试Kafka流函数?

qqrboqgw  于 2023-01-01  发布在  Java
关注(0)|答案(1)|浏览(88)

测试功能:

@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {

    return processKStream-> processKStream
            .filter((k, v) -> v.getId() > 10;
}

测试装置:

TestTopology testTopology = new TestTopology()

KStream<String, String> processKStream= streamsBuilder.stream("input-topic", Consumed.with(stringSerde, stringSerde));

testTopology.process().apply(processKStream);

topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());

processTaskTestInputTopic = topologyTestDriver.createInputTopic("input-topic", stringSerde.serializer(), stringSerde.serializer());
processTaskTestOutputTopic = topologyTestDriver.createOutputTopic("HERE GOES TOPIC NAME", stringSerde.deserializer(), stringSerde.deserializer());

我的函数"process"不输出主题,因此我没有它的名字。我该如何测试这个函数?
谢谢你的帮助!

vmdwslir

vmdwslir1#

你的方法只是在一个流上操作,因为过滤器操作是无状态的,所以它不会生成任何输出主题。
您可以为自己的测试提供与特定处理器无关的输出主题名称,然后测试输出主题是否包含所需的数据

相关问题