我需要在kafka应用程序上执行单元测试,避免使用第三方库。
我现在的问题是,我想在测试之间清除所有的主题,但我不知道怎么做。
这是我的临时解决方案:提交每个测试后生成的每条消息,并将所有测试使用者放在同一个使用者组中。
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
但这不管用,我也不知道为什么。
例如,当我在测试中创建一个新的使用者时, messages
包含在上一个测试中生成的消息。
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
我怎样才能解决这个问题?
2条答案
按热度按时间bz4sfanl1#
您还可以在测试源中嵌入kafka/zookeeper示例,以便在这些隔离服务上拥有更多控制器。
测试类可以
extends Kafka with ZooKeeper
以确保这一点。如果测试jvm没有分叉,
Tests.Cleanup
在sbt中testOptions in Test
设置可用于在测试后停止嵌入式服务。js4nwp542#
我建议,你只需在测试前重新创建所有主题。例如,这是kafka测试创建/删除主题的方式:
github上的kafka存储库