Spring Cloud流淌,消费多个Kafka主题

fkaflof6  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(378)

在一次尝试使用多个主题的过程中,我遇到了一个关于spring引导流的问题 @StreamListener .
据Spring Cloud流文件:据文件:
目的地
绑定中间件上通道的目标目标(例如,rabbitmq交换或kafka主题)。如果频道绑定为使用者,则可以将其绑定到多个目的地,并且可以将目的地名称指定为逗号分隔的字符串值。如果未设置,则使用通道名称。无法覆盖此>属性的默认值。
但是,在我使用下一个配置之后:

spring:
  cloud:
    stream:
      bindings:
        testchannel:
          group: test
          destination: platform.metrics, platform.sleuth

现在我又犯了一个错误:

Caused by: java.lang.IllegalArgumentException: Topic name can only have ASCII alphanumerics, '.', '_' and '-'
    at org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils.validateTopicName(KafkaTopicUtils.java:39) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar:1.2.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:107) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar:1.2.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:60) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar:1.2.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:110) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
    ... 20 common frames omitted

如何将多个主题绑定到一个主题 @StreamListner 或者从主题列表生成动态streamlisteners?

nr7wwzry

nr7wwzry1#

您只需替换逗号和下一个目标值之间的空格,它将如下所示:

spring:
    cloud:
        stream:
            bindings:
                testchannel:
                    group: test
                    destination: platform.metrics,platform.sleuth
uwopmtnx

uwopmtnx2#

在这里,我写了关于如何向动态目的地发送消息以及如何从动态目的地接收消息的更多细节。
https://stackoverflow.com/a/56446574/4587961
正如varun miglani所说,主题是用逗号分隔的,没有空格。

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

要动态发送消息,请使用binderawarechannelresolver和dynamicdestinations属性。

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3

相关问题