我有一个应用程序,它使用KStream从Kafka读取数据,根据头过滤数据,并写入KTable。
public Topology buildTopology() {
KStream<String,String> inputStream = builder.stream("topicname");
KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
.filter((key,value) -> value!=null);
kTable = filteredStream.groupByKey()
.reduce(((value1, value2) -> value2), Materialized.as("ktable"));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return builder.build();
}
我尝试使用TopologyTestDriver为此创建一个单元测试
private TopologyTestDriver td;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
private Topology topology;
private Properties streamConfig;
@BeforeEach
void setUp() {
streamConfig = new Properties();
streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);
inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}
@Test
void buildTopology(){
inputTopic.pipeInput("key1", "value1");
topology = app.buildTopology();
}
当我运行测试时,我得到异常“java.lang.IllegalArgumentException:未知主题:输入主题”
DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.
java.lang.IllegalArgumentException: Unknown topic: input-topic
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
at testclassname.buildTopology()
有没有人能帮我理解我在这里错过了什么?
1条答案
按热度按时间agxfikkp1#
我看到你正在创建一个 empty
Topology
,用于初始化TopologyTestDriver
:当这个空拓扑用于示例化
td = new TopologyTestDriver(topology, streamConfig);
的TopologyTestDriver
时,测试驱动程序不知道任何主题,因为没有有效地构建拓扑。我想这就是为什么当您尝试使用
inputTopic.pipeInput("key1", "value1");
将输入通过管道传输到"input-topic"
时,测试驱动程序会抛出一个IllegalArgumentException
,抱怨“Unknown topic: input-topic
“。您应该调用您的
buildTopology()
方法来生成您正在测试的实际拓扑,并在创建TopologyTestDriver
时使用它。确保测试中的主题名称(
input-topic
,output-topic
)与实际应用程序中的主题名称("topicname"
)相匹配。注意:我从设置中删除了输出主题,因为在您的代码片段中,您没有指定
KTable
写入的输出主题。如果您的实际应用程序写入输出主题,则可以将其添加回去。我更新了代码,添加了ktable存储的名称。
如何测试添加到ktable的值?
您可以查询支持
KTable
的状态存储来检查其内容。在Kafka Streams中,每个KTable都由一个状态存储(甚至是versioned one very recently, Aug. 2023)支持,您可以在测试中直接与此存储交互。
确保在拓扑中为KTable设置了存储名称:
这里,
"myKTableStore"
是支持KTable的状态存储的名称。在您的测试中,您可以从
TopologyTestDriver
检索存储并检查特定键的值:这样,您就可以验证
KTable
是否包含预期的键值对。请注意,
ReadOnlyKeyValueStore
是Kafka Streams API的一部分。根据需要导入。您可以在“Kafka Streams Interactive Query/ Querying local key-value stores”中看到它的使用
如何在测试中输入标题到输入主题?我没有别的选择。我在这里过滤头值
inputStream.transformValues(KSExtension::new)
在Kafka Streams的
TopologyTestDriver
中,直接向TestInputTopic
添加头的能力受到一定限制。但是,您可以使用较低级别的
pipeInput()
方法,该方法允许您传递一个ConsumerRecord
object,它可以具有头。您需要手动构建
ConsumerRecord
,然后使用它:确保将
"topicname"
替换为拓扑中实际阅读的主题名称,并根据测试需要调整键、值和标题。这将允许您在测试记录中包含头,然后
transformValues
操作将按预期处理这些头。