单元测试KafkaStreams给出IllegalArgumentException:未知主题

iszxjhcz  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(118)

我有一个应用程序,它使用KStream从Kafka读取数据,根据头过滤数据,并写入KTable。

public Topology buildTopology() {
        KStream<String,String> inputStream = builder.stream("topicname");
        KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
                .filter((key,value) -> value!=null);
        
        kTable = filteredStream.groupByKey()
                .reduce(((value1, value2) -> value2), Materialized.as("ktable"));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        return builder.build();
    }

我尝试使用TopologyTestDriver为此创建一个单元测试

private TopologyTestDriver td;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private Topology topology;
    private Properties streamConfig;

@BeforeEach
    void setUp() {
        streamConfig = new Properties();
        streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
        streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
        streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        topology = new Topology();
        td = new TopologyTestDriver(topology, streamConfig);
        inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
    }
 @Test
    void buildTopology(){
        inputTopic.pipeInput("key1", "value1");
        topology = app.buildTopology();
    }

当我运行测试时,我得到异常“java.lang.IllegalArgumentException:未知主题:输入主题”

DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.

java.lang.IllegalArgumentException: Unknown topic: input-topic
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
    at testclassname.buildTopology()

有没有人能帮我理解我在这里错过了什么?

agxfikkp

agxfikkp1#

我看到你正在创建一个 emptyTopology,用于初始化TopologyTestDriver

topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);

当这个空拓扑用于示例化td = new TopologyTestDriver(topology, streamConfig);TopologyTestDriver时,测试驱动程序不知道任何主题,因为没有有效地构建拓扑。
我想这就是为什么当您尝试使用inputTopic.pipeInput("key1", "value1");将输入通过管道传输到"input-topic"时,测试驱动程序会抛出一个IllegalArgumentException,抱怨“Unknown topic: input-topic“。
您应该调用您的buildTopology()方法来生成您正在测试的实际拓扑,并在创建TopologyTestDriver时使用它。
确保测试中的主题名称(input-topicoutput-topic)与实际应用程序中的主题名称("topicname")相匹配。

@BeforeEach
void setUp() {
    streamConfig = new Properties();
    streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
    streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
    streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    // Create the topology using your actual code
    topology = app.buildTopology();

    // Now create a TopologyTestDriver using the real topology
    td = new TopologyTestDriver(topology, streamConfig);

    // The topic name here should match the actual topic you use in the real topology
    inputTopic = td.createInputTopic("topicname", Serdes.String().serializer(), Serdes.String().serializer());

    // Create output topic if you need it
    // outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}

@Test
void buildTopology(){
    inputTopic.pipeInput("key1", "value1");
    // Your assertions here
}

注意:我从设置中删除了输出主题,因为在您的代码片段中,您没有指定KTable写入的输出主题。如果您的实际应用程序写入输出主题,则可以将其添加回去。
我更新了代码,添加了ktable存储的名称。
如何测试添加到ktable的值?
您可以查询支持KTable的状态存储来检查其内容。
在Kafka Streams中,每个KTable都由一个状态存储(甚至是versioned one very recently, Aug. 2023)支持,您可以在测试中直接与此存储交互。
确保在拓扑中为KTable设置了存储名称:

kTable = filteredStream.groupByKey()
        .reduce(((value1, value2) -> value2), Materialized.as("myKTableStore"));

这里,"myKTableStore"是支持KTable的状态存储的名称。
在您的测试中,您可以从TopologyTestDriver检索存储并检查特定键的值:

@Test
void buildTopology() {
    inputTopic.pipeInput("key1", "value1");

    // Retrieve the state store
    ReadOnlyKeyValueStore<String, String> keyValueStore = 
        td.getKeyValueStore("myKTableStore");

    // Assert that the KTable contains the expected value for the key
    assertEquals("value1", keyValueStore.get("key1"));
}

这样,您就可以验证KTable是否包含预期的键值对。
请注意,ReadOnlyKeyValueStore是Kafka Streams API的一部分。根据需要导入。
您可以在“Kafka Streams Interactive Query/ Querying local key-value stores”中看到它的使用

如何在测试中输入标题到输入主题?我没有别的选择。我在这里过滤头值inputStream.transformValues(KSExtension::new)
在Kafka Streams的TopologyTestDriver中,直接向TestInputTopic添加头的能力受到一定限制。
但是,您可以使用较低级别的pipeInput()方法,该方法允许您传递一个ConsumerRecord object,它可以具有头。
您需要手动构建ConsumerRecord,然后使用它:

@Test
void buildTopology() {
    // Create a Headers object and add your custom headers
    Headers headers = new RecordHeaders();
    headers.add(new RecordHeader("myHeaderKey", "myHeaderValue".getBytes()));

    // Create a ConsumerRecord with headers
    ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
            "topicname", // topic
            0, // partition
            0, // offset
            "key1".getBytes(), // key
            "value1".getBytes(), // value
            headers // headers
    );

    // Pipe the record into TopologyTestDriver
    td.pipeInput(record);

    // The rest of your test
}

确保将"topicname"替换为拓扑中实际阅读的主题名称,并根据测试需要调整键、值和标题。
这将允许您在测试记录中包含头,然后transformValues操作将按预期处理这些头。

相关问题