java—如何一起对kafka流和producer API进行单元测试

huwehgph  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(691)

目前,我有一个基本的kafka流应用程序,它涉及一个只有一个源和一个处理器,但没有接收器的拓扑结构。基本上,拓扑结构只处理消息的消耗。至于生成消息,我们在传递给拓扑的processorsupplier示例中调用producerAPI,特别是在重写的 process 方法。虽然我知道producerapi在这里是多余的,因为我可以简单地在拓扑中添加一个sink,但我现在的位置是必须以这种方式设置streams应用程序。至于测试,我尝试了 TopologyTestDriver 类,该类在kafka streams test utils包中可用。但是,我不仅要测试拓扑,还要测试对producerapi的调用。使用 TopologyTestDriver 要求我嘲笑我的朋友 Producer 示例,因为它与streams api分离。因此,由于信息不是“转发”,我无法阅读来自 TopologyTestDriver 我的单元测试。
这是我的一个简化版本 process 方法:

@Override
public void process(String key, String value) {
    // some data processing stuff that I leave out for simplicity sake
    String topic = "...";
    Properties props = ...;
    //Producer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
    producer.send(record);
}

下面是我的单元测试示例的简化:

@Test
public void process() {
    Topology topology = new Topology();
    topology.addSource("source", "input-topic");
    topology.addProcessor("processor", ..., "source");
    Properties props = ...;

    TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

    ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
    // the following line will work fine as long as the producer is mocked
    testDriver.pipeInput(factory.create("input-topic", "key", "value"));

    // since the producer is mocked, no message can be read from the output topic
    ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());

    assertNull(outputRecord); // returns true
}

总结一下我的问题,有没有一种方法可以编写一个单元测试,在使用producerapi将消息写入传出主题的拓扑中测试消息的消耗和产生?

nimxete2

nimxete21#

您不应该使用自定义 Producer 但是在你的房间里加个Flume Topology . 呼叫 Producer.send() 是异步的,因此可能会丢失数据。为了避免数据丢失,您需要使呼叫同步,即获取 Future 由返回的 send() 等待它的完成 process() 返回。但是,这对吞吐量有很大影响,不建议这样做。
如果您添加一个接收器,您就可以避免这些问题,因为kafka streams现在可以理解发送到输出主题的数据,因此不会发生数据丢失,而kafka streams可以使用性能更高的异步调用。
除了正确性问题之外,似乎您还创建了一个新的 KafkaProducer 对于当前代码中处理的每一条消息,效率都很低。此外,使用接收器将简化代码。当然,您可以使用 TopologyTestDriver .

相关问题