java——Kafka中两个不同主题的连接

6jjcrrmo  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(198)

我有一个主题名为topic1和topic2。我有以下代码:
-主题1:

KStream<Long, byte[]> events = builder.stream("topic1", Consumed.with(Serdes.Long(), Serdes.ByteArray()));

-主题2表:

KTable<Long, byte[]> table = builder.table("topic2",
        Consumed.with(Serdes.Long(), Serdes.ByteArray()));

当我和topic1的制作人一起制作时,一切都很好。topic1和topic2的值是不同的,我想用这个流“events”使用第一个topics producer生成的记录,然后将这个事件附加到seconds topic表,并为同一个键保存第二个topic值的状态(键很长,第一个和第二个topic的编号相同)。换句话说,我希望将流与基于相同键的表和update表连接起来。
我的代码:

StoreBuilder<KeyValueStore<Long, byte[]>> store = Stores
        .keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_TOPIC2), Serdes.Long(), Serdes.ByteArray())
        .withLoggingEnabled(new HashMap<>());

    builder.addStateStore(store);

    events.join(table, KeyValue::new, Joined.with(Serdes.Long(), Serdes.ByteArray(), Serdes.ByteArray()))
        .transform(Update::new, STORE_TOPIC2)
        .to("topic2", Produced.with(Serdes.Long(), Serdes.ByteArray()));

在最后一行中,我生成了主题2的关联事件,但是没有生成任何关于此主题的内容,我的转换器“update”如下所示:

private static class Update implements Transformer<Long, KeyValue<byte[], byte[]>, KeyValue<Long, byte[]>> {

    private KeyValueStore<Long, byte[]> store1;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        store1 = (KeyValueStore<Long, byte[]>) context.getStateStore(store);
    }

    @Override
    public KeyValue<Long, byte[]> transform(final Long key, final KeyValue<byte[], byte[]> updates) {

        System.out.println("Inside event transformer for key: " + key);
        System.out.println("Last produced graph: " + store.get(key));

        CustomClass c = null;
        try {
        c = deserializeModel(store.get(key));
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }

        if (c == null) {
        c = new CustomClass(...);
        }

        try {
        return KeyValue.pair(companyKey, serializeModel(companyNetwork));
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return null;
        }
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题