我有下面的拓扑结构,它使用processValues()
方法将流DSL与处理器API结合起来。我在这里添加一个状态存储。
KStream<String, SecurityCommand> securityCommands =
builder.stream(
"security-command",
Consumed.with(Serdes.String(), JsonSerdes.securityCommand()));
StoreBuilder<KeyValueStore<String, UserAccountSnapshot>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-account-snapshot"),
Serdes.String(),
JsonSerdes.userAccountSnapshot());
builder.addStateStore(storeBuilder);
securityCommands.processValues(() -> new SecurityCommandProcessor(), Named.as("security-command-processor"), "user-account-snapshot")
.processValues(() -> new UserAccountSnapshotUpdater(), Named.as("user-snapshot-updater"), "user-account-snapshot")
.to("security-event", Produced.with(
Serdes.String(),
JsonSerdes.userAccountEvent()));
SecurityCommandProcessor
代码如下:
class SecurityCommandProcessor implements FixedKeyProcessor<String, SecurityCommand, UserAccountEvent> {
private KeyValueStore<String, UserAccountSnapshot> kvStore;
private FixedKeyProcessorContext context;
@Override
public void init(FixedKeyProcessorContext context) {
this.kvStore = (KeyValueStore<String, UserAccountSnapshot>) context.getStateStore("user-account-snapshot");
this.context = context;
}
...
}
问题是context.getStateStore("user-account-snapshot")
返回null。
我试着用过时的transformValues()
来做几乎相同的代码,我能够得到状态存储。问题是processValues()
。我做错了什么吗?
1条答案
按热度按时间8cdiaqws1#
问题在于您使用的是FixedKeyProcessorSupplier的lambda示例。当处理器需要访问状态存储时,您需要覆盖
stores
方法,该方法在未被覆盖时返回null
。FixedKeyProcessorSupplier
扩展了ConnectedStoreProvider接口。因此,您需要提供处理器供应商的具体示例。
告诉我进展如何。
比尔·HTH