我有一个简单的KafkaStreams拓扑结构,它包含一个主题'foo'
,其中每个记录都是一个JSON blob(我将其串接到一个TaskDefSchema
中),以某个guid为键.我希望构造两个可查询的GlobalKTable
's,这样就有两个ReadOnlyKeyValueStore<String, FooThing>
's:
1.一个由guid键控(即输入主题foo
未更改)。
1.一个由JSON blob中的TaskDefSchema.name
字段作为键控。
下面是我的代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, TaskDefSchema> taskDefEvents = builder.stream(
config.getTaskDefTopic(), // 'foo' in the question above
Consumed.with(Serdes.String(), new TaskDefSerdes())
);
KStream<String, TaskDefSchema> taskDefEventsNameKeyed = taskDefEvents.selectKey(
((k, v) -> v.name)
);
String intermediateTopicByName = config.getTaskDefTopic() + "__intermediate_name";
String intermediateTopicByGuid = config.getTaskDefTopic() + "__intermediate_guid";
taskDefEventsNameKeyed.to(intermediateTopicByName);
taskDefEvents.to(intermediateTopicByGuid);
this.taskDefNameTable = builder.globalTable(
intermediateTopicByName,
Materialized.<String, TaskDefSchema, KeyValueStore<Bytes, byte[]>>
as("wf-spec-name")
.withKeySerde(Serdes.String())
.withValueSerde(new TaskDefSerdes())
);
this.taskDefGuidTable = builder.globalTable(
intermediateTopicByGuid,
Materialized.<String, TaskDefSchema, KeyValueStore<Bytes, byte[]>>
as("wf-spec-guid")
.withKeySerde(Serdes.String())
.withValueSerde(new TaskDefSerdes())
);
return builder.build();
当我试着查询商店时如下:
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"wf-spec-guid",
QueryableStoreTypes.keyValueStore()
)
);
函数调用返回一个按预期工作的KeyValueStore。* 注意:上面的块对应于第二个全局表-taskDefGuidTable。*
但是,当我执行以下调用以检索重新键控的globalTable(即taskDefGuidTable)时,我得到一个错误:
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"wf-spec-name", // this is the line that's different (:
QueryableStoreTypes.keyValueStore()
)
);
/* *********************** */
org.apache.kafka.streams.errors.UnknownStateStoreException: Cannot get state store wf-spec-name because no such store is registered in the topology.
任何帮助都将不胜感激。谢谢!
1条答案
按热度按时间8yoxcaq71#
我查询的
kafkaStreams
对象是错误的(我的代码中有多个KafkaStreams
...哎呀)。