我有一个监听器,需要从多个kafka服务器读取相同的主题,这都是在一个zookeeper配置。如何从多个服务器读取数据。你能帮个忙吗。我可以用zookeeper代替kafka服务器吗?
dz6r00yl1#
这个 @KafkaListener 要求KafkaListenerContainerFactory @Bean ,而这又是基于 ConsumerFactory . 以及 DefaultKafkaConsumerFactory 接受 Map<String, Object> 用户配置的数量:
@KafkaListener
@Bean
ConsumerFactory
DefaultKafkaConsumerFactory
Map<String, Object>
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); ... return props; } }
https://docs.spring.io/spring-kafka/docs/1.2.2.release/reference/html/_reference.html#__kafkalistener_annotation在哪里 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 正是标准的Apache·Kafka bootstrap.servers 属性:用于建立到kafka群集的初始连接的主机/端口对的列表。客户机将使用所有服务器,而不管此处为引导指定了哪些服务器。此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,。。。。由于这些服务器只是用于初始连接以发现完整的集群成员身份(可能会动态更改),因此此列表不需要包含完整的服务器集(不过,如果服务器关闭,您可能需要多个服务器)。不,你不能指出Zookeeper的地址。Kafka不再支持这一点。
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
tag5nh1u2#
Map有key:string和value:object。目标key:consumerconfig.bootstrap_servers_config 应该是由“,”分隔的主机端口对系列,例如:host1:port1、host2:port2、host3:port3。。。。。。e、 g监听三台服务器:localhost:9092,192.168.22.12:9088,localhost:7898
2条答案
按热度按时间dz6r00yl1#
这个
@KafkaListener
要求KafkaListenerContainerFactory
@Bean
,而这又是基于ConsumerFactory
. 以及DefaultKafkaConsumerFactory
接受Map<String, Object>
用户配置的数量:https://docs.spring.io/spring-kafka/docs/1.2.2.release/reference/html/_reference.html#__kafkalistener_annotation
在哪里
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
正是标准的Apache·Kafkabootstrap.servers
属性:用于建立到kafka群集的初始连接的主机/端口对的列表。客户机将使用所有服务器,而不管此处为引导指定了哪些服务器。此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,。。。。由于这些服务器只是用于初始连接以发现完整的集群成员身份(可能会动态更改),因此此列表不需要包含完整的服务器集(不过,如果服务器关闭,您可能需要多个服务器)。
不,你不能指出Zookeeper的地址。Kafka不再支持这一点。
tag5nh1u2#
Map有key:string和value:object。
目标key:consumerconfig.bootstrap_servers_config 应该是由“,”分隔的主机端口对系列,例如:host1:port1、host2:port2、host3:port3。。。。。。
e、 g监听三台服务器:localhost:9092,192.168.22.12:9088,localhost:7898