// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
如果你想测试 Kafka Stream 使用 Processor API ,dmitry提供的代码可能无法正常工作。因此,在对javadocs和官方文档进行了几个小时的研究之后,我给出了一个工作代码,以便测试您使用 JUnit .
public class TopologySpec {
private TopologyTestDriver testDriver;
@Before
public void setup() {
// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
// In this case, 'EventProcessor' is a custom processor
// that I implemented and I want to test
topology.addProcessor("processor", EventProcessor::new, "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// Setup test driver
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// EventProcessor is a <String,String> processor
// so we set those serders
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
testDriver = new TopologyTestDriver(topology, config);
}
@After
public void tearDown() {
testDriver.close(); // Close processors after finish the tests
}
@Test
public void firstTest() {
// Simulate a producer that sends the message "value,val" without key
ConsumerRecordFactory factory =
new ConsumerRecordFactory(new StringSerializer(), new StringSerializer());
testDriver.pipeInput(factory.create("input-topic", "value,val"));
// Simulate a consumer that reads from the output topic
// where are supposed to be the messages after being processed
// by your custom processor
ProducerRecord<String, String> record1 =
testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
// Compare the output to ensure that your custom processor
// is working properly. In this case, my processor consumes
// the message, concatenates ":::processed" to it, and
// push it to the output-topic
OutputVerifier.compareValue(record1, "value,val:::processed");
}
}
7条答案
按热度按时间ulmd4ohb1#
您可以在本地运行一个zookeeper和代理来测试kafka streams应用程序。
只需遵循以下快速入门指南:
本地zk和代理设置:http://kafka.apache.org/quickstart
http://docs.confluent.io/current/streams/quickstart.html
还可以查看这个kafka流示例(javadocs中有详细的遍历说明):
https://github.com/confluentinc/examples/tree/3.1.x/kafka-streams
5kgi1eie2#
你可以用https://github.com/jpzk/mockedstreams 请参见下面的示例。。。
希望这对你有帮助。。。
nimxete23#
更新kafka 1.1.0(发布日期:2018年3月23日):
kip-247增加了官方测试工具。根据升级指南:
有一个新的神器
kafka-streams-test-utils
提供TopologyTestDriver
,ConsumerRecordFactory
,和OutputVerifier
班级。可以将新工件作为常规依赖项包含到单元测试中,并使用测试驱动程序测试kafka streams应用程序的业务逻辑。有关更多详细信息,请参阅kip-247。根据文件:
测试驱动程序模拟库运行时,该库运行时连续地从输入主题获取记录,并通过遍历拓扑来处理这些记录。可以使用测试驱动程序验证指定的处理器拓扑是否使用手动管道输入的数据记录计算正确的结果。测试驱动程序捕获结果记录并允许查询其嵌入式状态存储:
有关详细信息,请参阅文档。
ProcessorTopologyTestDriver
从0.11.0.0起提供。可在kafka-streams
测试工件(用<classifier>test</classifier>
在maven):您还需要添加
kafka-clients
测试工件:然后你可以使用测试驱动程序。根据javadoc,首先创建一个
ProcessorTopologyTestDriver
:您可以将输入输入到拓扑中,就好像您实际编写了一个输入主题:
并阅读输出主题:
然后你可以Assert这些结果。
wdebmtf24#
SpringKafka支持使用嵌入式kafka进行单元测试参见https://docs.spring.io/spring-kafka/docs/2.1.0.release/reference/html/_reference.html#__embeddedkafka_annotation.
Kafka团队也在为streams发布一个测试驱动程序https://issues.apache.org/jira/browse/kafka-3625.
4ioopgfo5#
你应该检查一下Kafka的部队。
测试设置应如下所示:
然后要阅读你的信息并Assert一切正常,你可以这样做:
这实际上是一个嵌入式Kafka,它可以帮助您在测试中包含所有需要的内容。
你可以得到一点幻想和设置你的嵌入式Kafka作为
setup()
方法(或setupSpec()
在斯波克)和停止你嵌入Kafka在一个teardown()
.ijnw1ujt6#
当您询问是否可以在没有真正的kafka设置的情况下测试kafka流应用程序时,您可以尝试scala中的模拟流库。mocked streams 1.0是scala>=2.11.8的库,它允许您在不使用zookeeper和kafka代理的情况下对kafka streams应用程序的处理拓扑进行单元测试(因为apache kafka>=0.10.1)。参考文献:https://github.com/jpzk/mockedstreams
您还可以使用scalatest embedded kafka,它是一个库,提供内存中的kafka代理来运行scalatest规范。它使用Kafka0.10.1.1和zookeeper 3.4.8。
参考文献:https://github.com/manub/scalatest-embedded-kafka#scalatest-嵌入式Kafka流
祝你好运!
vhipe2zx7#
如果你想测试
Kafka Stream
使用Processor API
,dmitry提供的代码可能无法正常工作。因此,在对javadocs和官方文档进行了几个小时的研究之后,我给出了一个工作代码,以便测试您使用JUnit
.