测试功能:
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return processKStream-> processKStream
.filter((k, v) -> v.getId() > 10;
}
测试装置:
TestTopology testTopology = new TestTopology()
KStream<String, String> processKStream= streamsBuilder.stream("input-topic", Consumed.with(stringSerde, stringSerde));
testTopology.process().apply(processKStream);
topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());
processTaskTestInputTopic = topologyTestDriver.createInputTopic("input-topic", stringSerde.serializer(), stringSerde.serializer());
processTaskTestOutputTopic = topologyTestDriver.createOutputTopic("HERE GOES TOPIC NAME", stringSerde.deserializer(), stringSerde.deserializer());
我的函数"process"不输出主题,因此我没有它的名字。我该如何测试这个函数?
谢谢你的帮助!
1条答案
按热度按时间vmdwslir1#
你的方法只是在一个流上操作,因为过滤器操作是无状态的,所以它不会生成任何输出主题。
您可以为自己的测试提供与特定处理器无关的输出主题名称,然后测试输出主题是否包含所需的数据