我已经创建了2个Kafka状态存储,对应于下面两个不同的主题。
StateStoreSupplier tempStore1 = Stores.create("tempStore1").withKeys(Serdes.String()).withValues(valueSerde).persistent().build();
StateStoreSupplier tempStore2 = Stores.create("tempStore2").withKeys(Serdes.String()).withValues(valueSerde).persistent().build();
streamsBuilder.addSource("Source", "tempTopic1", "tempTopic2")
.addProcessor("Process", () -> new Processor(), "Source")
.connectProcessorAndStateStores("Process", "tempStore1", "tempStore2")
.addStateStore(tempStore1, "Process")
.addStateStore(tempStore2, "Process");
在processor类中,当各个主题中有消息时,我可以读取记录并将其添加到statesstores。但我无法阅读商店tempstore1时,有一个消息来自Tentopic2和副韦尔卡。我必须比较来自一个状态库和另一个状态库的消息,这意味着我需要同时读取两个状态库。
下面是来自process方法的示例代码snipper。我相信processorcontext(上下文变量)对于不同的主题是不同的,因此无法访问另一个存储。
tempKeyValueStore1 = (KeyValueStore) context.getStateStore("tempStore1");
tempKeyValueStore2 = (KeyValueStore) context.getStateStore("tempStore2");
if(context.topic().equals("tempTopic1"))
{
tempKeyValueStore1.put(value.getHeader().getCorrelationId(), value);
}else if(context.topic().equals("tempTopic2"))
{
tempKeyValueStore2.put(value.getHeader().getCorrelationId(), value);
System.out.println("Size: "+tempKeyValueStore1.approximateNumEntries()); // returning as 0 although there records in that statestore
}
提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!