在kafka流中测试交互式查询

evrscar2  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(324)

我有一个函数,它获取kafka streams的一个示例,获取状态存储,解析它并进行一些计算。

void func1(KafkaStreams streams)
{
   StoreQueryParameters<ReadOnlyKeyValueStore<String, Long>> storeQueryParams = 
                StoreQueryParameters.fromNameAndType(...);
   ReadOnlyKeyValueStore<String, Long> stateStore = streams.store(storeQueryParams);
   KeyValueIterator<String, Long> range = stateStore.all();
   ...
   // using this iterator, I will read each record in state store and do some computation.
}

让我们假设kafka流的拓扑是一个简单的拓扑,我们从一个主题中读取数据,并将精确的记录存储在状态存储中。
如何测试这些需要Kafka设置的函数?

pbgvytdp

pbgvytdp1#

你可以尝试直接测试商店,通过 TopologyTestDriver 举个例子:
https://github.com/openzipkin-contrib/zipkin-storage-kafka/blob/56afb2e7a0bd4381cab9c97002018d301c331b29/storage/src/test/java/zipkin2/storage/kafka/streams/tracestoragetopologytest.java#l186-l200型
如果你想测试你的 func1 它访问kafkastreams示例,需要进行集成测试。testcontainers有助于为您的测试提供kafka集群https://www.testcontainers.org/modules/kafka/

相关问题