我有一个主题名为topic1和topic2。我有以下代码:
-主题1:
KStream<Long, byte[]> events = builder.stream("topic1", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
-主题2表:
KTable<Long, byte[]> table = builder.table("topic2",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
当我和topic1的制作人一起制作时,一切都很好。topic1和topic2的值是不同的,我想用这个流“events”使用第一个topics producer生成的记录,然后将这个事件附加到seconds topic表,并为同一个键保存第二个topic值的状态(键很长,第一个和第二个topic的编号相同)。换句话说,我希望将流与基于相同键的表和update表连接起来。
我的代码:
StoreBuilder<KeyValueStore<Long, byte[]>> store = Stores
.keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_TOPIC2), Serdes.Long(), Serdes.ByteArray())
.withLoggingEnabled(new HashMap<>());
builder.addStateStore(store);
events.join(table, KeyValue::new, Joined.with(Serdes.Long(), Serdes.ByteArray(), Serdes.ByteArray()))
.transform(Update::new, STORE_TOPIC2)
.to("topic2", Produced.with(Serdes.Long(), Serdes.ByteArray()));
在最后一行中,我生成了主题2的关联事件,但是没有生成任何关于此主题的内容,我的转换器“update”如下所示:
private static class Update implements Transformer<Long, KeyValue<byte[], byte[]>, KeyValue<Long, byte[]>> {
private KeyValueStore<Long, byte[]> store1;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store1 = (KeyValueStore<Long, byte[]>) context.getStateStore(store);
}
@Override
public KeyValue<Long, byte[]> transform(final Long key, final KeyValue<byte[], byte[]> updates) {
System.out.println("Inside event transformer for key: " + key);
System.out.println("Last produced graph: " + store.get(key));
CustomClass c = null;
try {
c = deserializeModel(store.get(key));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (c == null) {
c = new CustomClass(...);
}
try {
return KeyValue.pair(companyKey, serializeModel(companyNetwork));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!