kafka消费者在Spring云流不启动

ff29svar  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(489)

这是我的消费者的配置

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input: 
          destination: greeting
          content-type: application/json
      kafka:   
        binder:
          brokers: kafka
          zkNodes: zookeeper

我的应用程序的代码

@SpringBootApplication
@EnableIntegration
@EnableBinding(CommandSink.class)
public class KafkaTesterApplication {
    private static Logger logger = LogManager.getLogger(KafkaTesterApplication.class);

    /**
     * @param args
     */
    public static void main(String[] args) {
        SpringApplication.run(KafkaTesterApplication.class, args);
    }

    @ServiceActivator(inputChannel="input")
    public void receiveMessage(String message) {
        logger.debug("receive {}", message);
    }
}

和接收器接口

public interface CommandSink {
    public static final String CHANNEL = "input";

    @Input(CommandSink.CHANNEL)
    SubscribableChannel command();

}

看起来消费者和zookeeper和kafka没有联系。你知道吗?

dbf7pr2w

dbf7pr2w1#

好的,我们找到了解决办法。。。
我们不知道为什么,但缺少一个主题。问题中最奇怪的是,使用zookeeper(旧式)的消费者可以使用消息。
缺少的主题是
__消费者补偿

相关问题