我有两个主题(实际上更多,但在这里保持简单),我加入使用streams dsl和一旦加入,发布数据到下游。
我在主题1的基础上创建了一个ktable,并将其存储到一个命名的状态存储中。主题1的键如下所示:
{ sourceCode:"WXYZ",
platformCode:"ABCD",
transactionIdentifier:"012345:01:55555:12345000:1"
}
我在changelog主题中看到了预期的数据。
在主题2上有一个kstream。主题2的键如下所示:
{ sourceCode:"WXYZ",
platformCode:"ABCD",
transactionIdentifier:"012345:01:55555:12345000:1"
lineIdentifier:"1"
}
我正在重新键入和聚合来自主题2的数据,并将其放入另一个命名的状态存储中,因为主题1和主题2中的数据之间存在1-many关系。 重新设置数据密钥后,主题2中的密钥看起来与主题1中的密钥相同。我可以在重分区主题中看到重新键入的数据,也可以在changelog主题中看到预期的聚合数据。但是,连接没有被触发。
其他关键细节-
所有主题中的数据均为avro格式。
我正在使用java/spring引导。
我在commit.interval.ms和cache.max.bytes.buffering上保留了默认设置
有人指出我可能做错了什么吗?
编辑1:我查看了数据分区,看起来一个分区在14个分区,另一个分区在20个分区。我还发现了一个类似的问题。
编辑2:topic1和topic2的生产者是一个golang应用程序。streams restore使用者具有以下配置:
partition.assignment.strategy=[类org.apache.kafka.clients.consumer.rangeassignor]
streams使用者具有以下配置:
partition.assignment.strategy=[org.apache.kafka.streams.processor.internals.streamspartitionassignor]
1条答案
按热度按时间kt06eoxx1#
我把答案贴在下面是为了帮助其他人从这些问题中寻找涅盘。正如在相关问题的评论部分所指出的,这是由于生产者的申请而引起的问题。
producer应用程序是用golang编写的,因此,它的散列不同于java,这就是我使用dsl连接数据流所使用的。
在前面,我是这样阅读ktable的,它维护着与源主题中相同的分区:
为了达到预期的效果,我重写了以下代码: