我有一个函数,它获取kafka streams的一个示例,获取状态存储,解析它并进行一些计算。
void func1(KafkaStreams streams)
{
StoreQueryParameters<ReadOnlyKeyValueStore<String, Long>> storeQueryParams =
StoreQueryParameters.fromNameAndType(...);
ReadOnlyKeyValueStore<String, Long> stateStore = streams.store(storeQueryParams);
KeyValueIterator<String, Long> range = stateStore.all();
...
// using this iterator, I will read each record in state store and do some computation.
}
让我们假设kafka流的拓扑是一个简单的拓扑,我们从一个主题中读取数据,并将精确的记录存储在状态存储中。
如何测试这些需要Kafka设置的函数?
1条答案
按热度按时间pbgvytdp1#
你可以尝试直接测试商店,通过
TopologyTestDriver
举个例子:https://github.com/openzipkin-contrib/zipkin-storage-kafka/blob/56afb2e7a0bd4381cab9c97002018d301c331b29/storage/src/test/java/zipkin2/storage/kafka/streams/tracestoragetopologytest.java#l186-l200型
如果你想测试你的
func1
它访问kafkastreams示例,需要进行集成测试。testcontainers有助于为您的测试提供kafka集群https://www.testcontainers.org/modules/kafka/