kstream-ktable连接未触发

dnph8jn4  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(394)

我有两个主题(实际上更多,但在这里保持简单),我加入使用streams dsl和一旦加入,发布数据到下游。 
我在主题1的基础上创建了一个ktable,并将其存储到一个命名的状态存储中。主题1的键如下所示:

{  sourceCode:"WXYZ",
    platformCode:"ABCD",
    transactionIdentifier:"012345:01:55555:12345000:1"
}

我在changelog主题中看到了预期的数据。 
在主题2上有一个kstream。主题2的键如下所示:

{  sourceCode:"WXYZ",
   platformCode:"ABCD",
   transactionIdentifier:"012345:01:55555:12345000:1"
   lineIdentifier:"1"
}

  我正在重新键入和聚合来自主题2的数据,并将其放入另一个命名的状态存储中,因为主题1和主题2中的数据之间存在1-many关系。  重新设置数据密钥后,主题2中的密钥看起来与主题1中的密钥相同。我可以在重分区主题中看到重新键入的数据,也可以在changelog主题中看到预期的聚合数据。但是,连接没有被触发。
其他关键细节-

所有主题中的数据均为avro格式。
我正在使用java/spring引导。
我在commit.interval.ms和cache.max.bytes.buffering上保留了默认设置
有人指出我可能做错了什么吗?
编辑1:我查看了数据分区,看起来一个分区在14个分区,另一个分区在20个分区。我还发现了一个类似的问题。
编辑2:topic1和topic2的生产者是一个golang应用程序。streams restore使用者具有以下配置:
partition.assignment.strategy=[类org.apache.kafka.clients.consumer.rangeassignor]
streams使用者具有以下配置:
partition.assignment.strategy=[org.apache.kafka.streams.processor.internals.streamspartitionassignor]

kt06eoxx

kt06eoxx1#

我把答案贴在下面是为了帮助其他人从这些问题中寻找涅盘。正如在相关问题的评论部分所指出的,这是由于生产者的申请而引起的问题。
producer应用程序是用golang编写的,因此,它的散列不同于java,这就是我使用dsl连接数据流所使用的。
在前面,我是这样阅读ktable的,它维护着与源主题中相同的分区:

@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(inputTopic1, Materialized.as(transactionStore));
}

为了达到预期的效果,我重写了以下代码:

@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {

    SpecificAvroSerde<MyKey> keySpecificAvroSerde = myKeySpecificAvroSerde();
    SpecificAvroSerde<MyValue> valueSpecificAvroSerde = mySpecificAvroSerde();

    streamsBuilder.stream(inputTopic1, Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde)).
            selectKey((key, value) -> new MyKey(key.get1(), key.get2(), key.get3())).
        to("dummyTopic", Produced.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    return streamsBuilder.table("dummyTopic",
            Materialized.<MyKey, MyValue, KeyValueStore<Bytes, byte[]>>as("myStateStore").
                   withKeySerde(keySpecificAvroSerde).withValueSerde(valueSpecificAvroSerde));
}

相关问题