测试kafka流拓扑

ohtdti5x  于 2021-06-07  发布在  Kafka
关注(0)|答案(7)|浏览(437)

我正在寻找一种测试kafka流应用程序的方法。这样我就可以定义输入事件,测试套件就会显示输出。
如果没有真正的Kafka,这可能吗?

ulmd4ohb

ulmd4ohb1#

您可以在本地运行一个zookeeper和代理来测试kafka streams应用程序。
只需遵循以下快速入门指南:
本地zk和代理设置:http://kafka.apache.org/quickstart
http://docs.confluent.io/current/streams/quickstart.html
还可以查看这个kafka流示例(javadocs中有详细的遍历说明):
https://github.com/confluentinc/examples/tree/3.1.x/kafka-streams

5kgi1eie

5kgi1eie2#

你可以用https://github.com/jpzk/mockedstreams 请参见下面的示例。。。

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()

MockedStreams()
  .topology { builder => builder.stream(...) [...] }
  .input("topic-in", strings, strings, input)
  .output("topic-out", strings, strings, exp.size) shouldEqual exp

希望这对你有帮助。。。

nimxete2

nimxete23#

更新kafka 1.1.0(发布日期:2018年3月23日):
kip-247增加了官方测试工具。根据升级指南:
有一个新的神器 kafka-streams-test-utils 提供 TopologyTestDriver , ConsumerRecordFactory ,和 OutputVerifier 班级。可以将新工件作为常规依赖项包含到单元测试中,并使用测试驱动程序测试kafka streams应用程序的业务逻辑。有关更多详细信息,请参阅kip-247。
根据文件:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>1.1.0</version>
    <scope>test</scope>
</dependency>

测试驱动程序模拟库运行时,该库运行时连续地从输入主题获取记录,并通过遍历拓扑来处理这些记录。可以使用测试驱动程序验证指定的处理器拓扑是否使用手动管道输入的数据记录计算正确的结果。测试驱动程序捕获结果记录并允许查询其嵌入式状态存储:

// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));

// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

有关详细信息,请参阅文档。 ProcessorTopologyTestDriver 从0.11.0.0起提供。可在 kafka-streams 测试工件(用 <classifier>test</classifier> 在maven):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.11.0.0</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

您还需要添加 kafka-clients 测试工件:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

然后你可以使用测试驱动程序。根据javadoc,首先创建一个 ProcessorTopologyTestDriver :

StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

您可以将输入输入到拓扑中,就好像您实际编写了一个输入主题:

driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

并阅读输出主题:

ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

然后你可以Assert这些结果。

wdebmtf2

wdebmtf24#

SpringKafka支持使用嵌入式kafka进行单元测试参见https://docs.spring.io/spring-kafka/docs/2.1.0.release/reference/html/_reference.html#__embeddedkafka_annotation.
Kafka团队也在为streams发布一个测试驱动程序https://issues.apache.org/jira/browse/kafka-3625.

4ioopgfo

4ioopgfo5#

你应该检查一下Kafka的部队。
测试设置应如下所示:

KafkaUnit kafkaUnitServer = new KafkaUnit();
kafkaUnitServer.startup();
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);

然后要阅读你的信息并Assert一切正常,你可以这样做:

List<String> messages = kafkaUnitServer.readMessages(testTopic, 1);

这实际上是一个嵌入式Kafka,它可以帮助您在测试中包含所有需要的内容。
你可以得到一点幻想和设置你的嵌入式Kafka作为 setup() 方法(或 setupSpec() 在斯波克)和停止你嵌入Kafka在一个 teardown() .

ijnw1ujt

ijnw1ujt6#

当您询问是否可以在没有真正的kafka设置的情况下测试kafka流应用程序时,您可以尝试scala中的模拟流库。mocked streams 1.0是scala>=2.11.8的库,它允许您在不使用zookeeper和kafka代理的情况下对kafka streams应用程序的处理拓扑进行单元测试(因为apache kafka>=0.10.1)。参考文献:https://github.com/jpzk/mockedstreams
您还可以使用scalatest embedded kafka,它是一个库,提供内存中的kafka代理来运行scalatest规范。它使用Kafka0.10.1.1和zookeeper 3.4.8。
参考文献:https://github.com/manub/scalatest-embedded-kafka#scalatest-嵌入式Kafka流
祝你好运!

vhipe2zx

vhipe2zx7#

如果你想测试 Kafka Stream 使用 Processor API ,dmitry提供的代码可能无法正常工作。因此,在对javadocs和官方文档进行了几个小时的研究之后,我给出了一个工作代码,以便测试您使用 JUnit .

public class TopologySpec {

private TopologyTestDriver testDriver;

@Before
public void setup() {
    // Processor API
    Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    // In this case, 'EventProcessor' is a custom processor
    // that I implemented and I want to test
    topology.addProcessor("processor", EventProcessor::new, "sourceProcessor");
    topology.addSink("sinkProcessor", "output-topic", "processor");

    // Setup test driver
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor 
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    testDriver = new TopologyTestDriver(topology, config);
}

@After
public void tearDown() {
    testDriver.close(); // Close processors after finish the tests
}

@Test
public void firstTest() {
    // Simulate a producer that sends the message "value,val" without key
    ConsumerRecordFactory factory =
            new ConsumerRecordFactory(new StringSerializer(), new StringSerializer());

    testDriver.pipeInput(factory.create("input-topic", "value,val"));

    // Simulate a consumer that reads from the output topic 
    // where are supposed to be the messages after being processed
    // by your custom processor
    ProducerRecord<String, String> record1 =
            testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());

    // Compare the output to ensure that your custom processor
    // is working properly. In this case, my processor consumes
    // the message, concatenates ":::processed" to it, and
    // push it to the output-topic
    OutputVerifier.compareValue(record1, "value,val:::processed");
}
}

相关问题