如何读取从不同处理器更新的处理器中的kafka statestore?

guykilcj  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(291)

我已经创建了2个Kafka状态存储,对应于下面两个不同的主题。

StateStoreSupplier tempStore1 = Stores.create("tempStore1").withKeys(Serdes.String()).withValues(valueSerde).persistent().build();

    StateStoreSupplier tempStore2 = Stores.create("tempStore2").withKeys(Serdes.String()).withValues(valueSerde).persistent().build();

    streamsBuilder.addSource("Source", "tempTopic1", "tempTopic2")
                    .addProcessor("Process", () -> new Processor(), "Source")
                    .connectProcessorAndStateStores("Process", "tempStore1", "tempStore2")
                    .addStateStore(tempStore1, "Process")
                    .addStateStore(tempStore2, "Process");

在processor类中,当各个主题中有消息时,我可以读取记录并将其添加到statesstores。但我无法阅读商店tempstore1时,有一个消息来自Tentopic2和副韦尔卡。我必须比较来自一个状态库和另一个状态库的消息,这意味着我需要同时读取两个状态库。
下面是来自process方法的示例代码snipper。我相信processorcontext(上下文变量)对于不同的主题是不同的,因此无法访问另一个存储。

tempKeyValueStore1 = (KeyValueStore) context.getStateStore("tempStore1");
            tempKeyValueStore2 = (KeyValueStore) context.getStateStore("tempStore2");
            if(context.topic().equals("tempTopic1"))
            {
                tempKeyValueStore1.put(value.getHeader().getCorrelationId(), value);
            }else if(context.topic().equals("tempTopic2"))
            {
                tempKeyValueStore2.put(value.getHeader().getCorrelationId(), value);
                System.out.println("Size: "+tempKeyValueStore1.approximateNumEntries()); // returning as 0 although there records in that statestore
            }

提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题