kafka streams:使用相同的“application.id”从多个主题中使用

h6my8fg2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(597)

我有一个应用程序,需要听多个不同的主题;对于如何处理消息,每个主题都有单独的逻辑。我原以为对每个kafkastreams示例都使用相同的kafka属性,但出现了如下错误。
错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

代码(kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach { key, value -> LOG.info("do other stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

我发现这个参考说明你不能用 application.id 但对于多个主题,我发现很难找到支持这一点的参考文档。的文档 application.id 国家:
流处理应用程序的标识符。在Kafka集群中必须是唯一的。它用作1)默认客户端id前缀,2)用于成员身份管理的组id,3)changelog主题前缀。
问题
这个错误是什么意思,是什么原因造成的。
假设您可以让应用程序的多个示例使用相同的id从多个主题分区中使用,那么“在kafka集群中必须是唯一的”是什么意思?
你能用同样的Kafka流吗 application.id 开始第二步 KafkaStreams 在不同的主题上列出?如果是,怎么做?
细节:Kafka0.11.0.2

1cklez4t

1cklez4t1#

kafka流通过分区而不是主题进行扩展。因此,如果用同一个 application.id 它们在订阅的输入主题和处理逻辑方面必须相同。应用程序使用 application.id 作为 group.id 因此,输入主题的不同分区被分配给不同的示例。
如果不同的主题具有相同的逻辑,则可以一次订阅所有主题(在启动的每个示例中)。不过,扩展仍然基于分区(它基本上是您输入主题的“合并”。)
如果要通过主题进行缩放和/或具有不同的处理逻辑,则必须使用不同的 application.id 对于不同的Kafka流应用程序。

相关问题