如何使用springkafka为侦听器传递多个引导服务器

nzkunb0c  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(353)

我有一个监听器,需要从多个kafka服务器读取相同的主题,这都是在一个zookeeper配置。如何从多个服务器读取数据。你能帮个忙吗。
我可以用zookeeper代替kafka服务器吗?

dz6r00yl

dz6r00yl1#

这个 @KafkaListener 要求
KafkaListenerContainerFactory @Bean ,而这又是基于 ConsumerFactory . 以及 DefaultKafkaConsumerFactory 接受 Map<String, Object> 用户配置的数量:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
        ...
        return props;
    }
}

https://docs.spring.io/spring-kafka/docs/1.2.2.release/reference/html/_reference.html#__kafkalistener_annotation
在哪里 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 正是标准的Apache·Kafka bootstrap.servers 属性:
用于建立到kafka群集的初始连接的主机/端口对的列表。客户机将使用所有服务器,而不管此处为引导指定了哪些服务器。此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,。。。。由于这些服务器只是用于初始连接以发现完整的集群成员身份(可能会动态更改),因此此列表不需要包含完整的服务器集(不过,如果服务器关闭,您可能需要多个服务器)。
不,你不能指出Zookeeper的地址。Kafka不再支持这一点。

tag5nh1u

tag5nh1u2#

Map有key:string和value:object。
目标key:consumerconfig.bootstrap_servers_config 应该是由“,”分隔的主机端口对系列,例如:host1:port1、host2:port2、host3:port3。。。。。。
e、 g监听三台服务器:localhost:9092,192.168.22.12:9088,localhost:7898

相关问题