spring集成kafka(2.0.0)和spring kafka 1.0.2消费者收到重复消息

ukxgm1gy  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(260)

我有两个spring引导项目,第一个是一个生产者,它用一个分区向一个主题发送消息。
第二个是使用者应用程序,它读取主题和一个分区。对于消费者,我使用kafkamessagedrivenchanneladapter、kafkamessagelistenercontainer并在consumerfactory中指定消费者组id。
注意,我使用的是spring集成kafka 2.0.0.release和spring kafka 1.0.2.release,后者使用kakfa0.9。我正在docker容器中运行3个docker示例或kafka 0.10.0和一个zookeeper示例。
当我运行我的消费者的一个示例时,它工作得很好,读取消息,处理消息。
但是,当我运行应用程序的第二个示例(我只是更改端口)时,生产者应用程序生成的任何消息都会被两个示例接收,从而导致每个消息处理两次。
根据文档,我觉得这个场景应该是可行的,因为在这个例子中第二个示例的原因是为了弹性,如果一个应用示例宕机,另一个会接管,但不是两个都应该得到消费者组中相同主题/分区的消息。注意,我使用服务激活器(facade)来处理消息。
有什么我不知道的吗。
请帮忙。
以下是我基于spring integration kafka示例的消费者应用程序配置:

{
    @ServiceActivator(inputChannel = "received", outputChannel = "nullChannel", adviceChain = {"requestHandlerRetryAdvice"})
        @Bean
        public MessageConsumerServiceFacade messageConsumerServiceFacade() {
            return new DefaultMessageConsumerServiceFacade();
        }

        @ServiceActivator(inputChannel = "errorChannel", outputChannel = "nullChannel")
        @Bean
        public MessageConsumerServiceFacade messageConsumerErrorServiceFacade() {
            return new DefaultMessageConsumerErrorServiceFacade();
        }
        @ServiceActivator(inputChannel = "received", outputChannel = "nullChannel", adviceChain = {"requestHandlerRetryAdvice"})
        @Bean
        public MessageConsumerServiceFacade messageConsumerServiceFacade() {
            return new DefaultMessageConsumerServiceFacade();
        }

        @ServiceActivator(inputChannel = "errorChannel", outputChannel = "nullChannel")
        @Bean
        public MessageConsumerServiceFacade messageConsumerErrorServiceFacade() {
            return new DefaultMessageConsumerErrorServiceFacade();
        }

            @Bean
            public IntegrationFlow consumer() throws Exception {

                LOGGER.info("starting consumer..");

                return IntegrationFlows
                        .from(adapter(container()))
                        .get();
            }

         @Bean  
         public KafkaMessageListenerContainer<String, byte[]> container() throws Exception {
    // This variant of the constructors DOES NOT WORK with Consumer Group, with this setting, all consumers receives the message - BAD for a cluster of consumer apps - duplicate message
    //ContainerProperties containerProperties = new ContainerProperties( new TopicPartitionInitialOffset(this.topic, 0));

    // Use THIS variant of the constructors to use Consumer Group successfully
    // with auto re-balance of partitions to distribute loads among consumers, perfect for a cluster of consumer app
    ContainerProperties containerProperties = new ContainerProperties(this.topic);
    containerProperties.setAckOnError(false);
    containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

    KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);

    return kafkaMessageListenerContainer;

}

            @Bean 
            public ConsumerFactory<String, byte[]> consumerFactory() {
                Map<String, Object> props = new HashMap<>();
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);

                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
                props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 2);
                props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // earliest, latest, none

                return new DefaultKafkaConsumerFactory<>(props);
            }

            @Bean
            public KafkaMessageDrivenChannelAdapter<String, byte[]> adapter(KafkaMessageListenerContainer<String, byte[]> container) {
                KafkaMessageDrivenChannelAdapter<String, byte[]> kafkaMessageDrivenChannelAdapter =
                        new KafkaMessageDrivenChannelAdapter<>(container);

                kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
                kafkaMessageDrivenChannelAdapter.setErrorChannel(errorChannel());

                return kafkaMessageDrivenChannelAdapter;
            }

            @Bean 
            public MessageChannel received() {

                return new PublishSubscribeChannel();
            }

            @Bean 
            public MessageChannel errorChannel() {
                return new PublishSubscribeChannel();

            }
        }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题