kafka消费者在spring boot中未收到消息

5t7ly7z5  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(533)

我的spring/java使用者无法访问producer生成的消息。但是,当我从控制台/终端运行consumer时,它能够接收spring/java producer生成的消息。
使用者配置:

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() {
        return bootstrap;
    }

    public void setBootstrap(String bootstrap) {
        this.bootstrap = bootstrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

侦听器配置:

@Configuration
@EnableKafka
public class KafkaListenerConfig {

    @Autowired
    private KafkaConsumerProperties kafkaConsumerProperties;

    @Bean
    public Map<String, Object> getConsumerProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return properties;
    }

    @Bean
    public Deserializer stringKeyDeserializer() {
        return new StringDeserializer();
    }

    @Bean
    public Deserializer transactionJsonValueDeserializer() {
        return new JsonDeserializer(Transaction.class);
    }

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Kafka听众:

@Service
public class TransactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

    @KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
    public void onReceive(Transaction transaction) {
        LOGGER.info("transaction = {}",transaction);
    }
}

消费者应用:

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

测试用例1:pass我启动了spring/java生产者并从控制台运行消费者。当我从producer生成消息时,我的控制台使用者能够访问消息。
测试用例2:失败我启动了spring/java消费者并从控制台运行producer。当我从console producer生成消息时,我的spring/java消费者无法访问该消息。
测试用例3:失败我启动了spring/java消费者并运行spring/java生产者。当我从spring/java生产者生成消息时,我的spring/java消费者无法访问消息。
问题
我的消费代码有什么问题吗?
我是否缺少Kafka监听器的配置?
我需要显式运行侦听器吗(我不这么认为,因为我可以在终端日志上看到连接到主题,但我不确定)

e3bfsja2

e3bfsja21#

好吧,你失踪了 AUTO_OFFSET_RESET_CONFIGConsumer Configs ```
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

自动偏移复位
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
其他:向消费者抛出异常
注: `auto.offset.reset` 至 `earliest` 仅当kafka没有该消费组的偏移量时才起作用(因此在这种情况下,您需要将此属性与新的消费组一起添加并重新启动应用程序)

相关问题