我已经建立了一个简单的聚合,将来自多个流的值平均在一起,并尝试对其进行测试。我已经浪费了很多时间,我似乎无法在我的头脑中清楚的概念。我的流如下:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
我的测试使用embeddedkafka,将一堆记录发布到主题,并在阻塞的队列中等待记录到达 sinkTopic
.
我感兴趣的是这个聚合是如何随时间变化的,所以我希望在每个输出ticker上Assert这个平均值。我可能会添加一些级别的窗口,但我现在已经尝试保持简单。
当我运行我的测试时,我得到不同的结果。假设拓扑中有10条输入记录:
我的聚合器被叫了10次
我把断点放在我的 AverageTick
serialiser被调用的次数不一。
我在测试中Assert记录的值。
我认为这是因为kip-63中定义的缓存功能——记录在处理节点中出现得非常快,并且与最新的记录合并/覆盖(不过,我不完全确定。)
我的单元测试通过了 ProcessorTopologyTestDriver
,但我正在尝试为包含此逻辑的服务编写一些验收测试。
我也试过玩我的 commit.interval.ms
配置,以及将睡眠之间发布我的输入记录,以不同程度的(片状)成功。
这些测试有意义吗?
如何针对一个真实的kafka示例Assert这个微服务的正确性?
我觉得我在做一些概念上的错误-我只是不知道还有什么方法可以采取。
1条答案
按热度按时间wlsrxk511#
你的观察是正确的。缓存使测试变得困难,因为它引入了非确定性。
要编写有用的测试,有两个选项:
通过将缓存大小设置为零来禁用缓存(这样,所有输出记录,包括所有中间记录都是确定性的)
只检查每个键的最后一个结果记录(最后一个结果必须始终相同,与固定输入数据的缓存无关)
顺便说一句:在即将推出的1.1版本中,Kafka添加了一个公共测试包,我们计划添加更多:https://cwiki.apache.org/confluence/display/kafka/kip-247%3a+add+public+test+utils+for+kafka+streams