Kafka KStream.processValues()-从固定密钥处理器中获取一个空状态存储

vlf7wbxs  于 2022-11-28  发布在  Apache
关注(0)|答案(1)|浏览(124)

我有下面的拓扑结构,它使用processValues()方法将流DSL与处理器API结合起来。我在这里添加一个状态存储。

KStream<String, SecurityCommand> securityCommands =
            builder.stream(
                    "security-command",
                    Consumed.with(Serdes.String(), JsonSerdes.securityCommand()));

StoreBuilder<KeyValueStore<String, UserAccountSnapshot>> storeBuilder =
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("user-account-snapshot"),
                    Serdes.String(),
                    JsonSerdes.userAccountSnapshot());

builder.addStateStore(storeBuilder);

securityCommands.processValues(() -> new SecurityCommandProcessor(), Named.as("security-command-processor"), "user-account-snapshot")
                .processValues(() -> new UserAccountSnapshotUpdater(), Named.as("user-snapshot-updater"), "user-account-snapshot")
                .to("security-event", Produced.with(
                                                Serdes.String(),
                                                JsonSerdes.userAccountEvent()));

SecurityCommandProcessor代码如下:

class SecurityCommandProcessor implements FixedKeyProcessor<String, SecurityCommand, UserAccountEvent> {

    private KeyValueStore<String, UserAccountSnapshot> kvStore;
    private FixedKeyProcessorContext context;

    @Override
    public void init(FixedKeyProcessorContext context) {
        this.kvStore = (KeyValueStore<String, UserAccountSnapshot>) context.getStateStore("user-account-snapshot");
        this.context = context;
    }
    ...
}

问题是context.getStateStore("user-account-snapshot")返回null
我试着用过时的transformValues()来做几乎相同的代码,我能够得到状态存储。问题是processValues()。我做错了什么吗?

8cdiaqws

8cdiaqws1#

问题在于您使用的是FixedKeyProcessorSupplier的lambda示例。当处理器需要访问状态存储时,您需要覆盖stores方法,该方法在未被覆盖时返回nullFixedKeyProcessorSupplier扩展了ConnectedStoreProvider接口。
因此,您需要提供处理器供应商的具体示例。
告诉我进展如何。
比尔·HTH

相关问题