这是我的消费者的配置
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: greeting
content-type: application/json
kafka:
binder:
brokers: kafka
zkNodes: zookeeper
我的应用程序的代码
@SpringBootApplication
@EnableIntegration
@EnableBinding(CommandSink.class)
public class KafkaTesterApplication {
private static Logger logger = LogManager.getLogger(KafkaTesterApplication.class);
/**
* @param args
*/
public static void main(String[] args) {
SpringApplication.run(KafkaTesterApplication.class, args);
}
@ServiceActivator(inputChannel="input")
public void receiveMessage(String message) {
logger.debug("receive {}", message);
}
}
和接收器接口
public interface CommandSink {
public static final String CHANNEL = "input";
@Input(CommandSink.CHANNEL)
SubscribableChannel command();
}
看起来消费者和zookeeper和kafka没有联系。你知道吗?
1条答案
按热度按时间dbf7pr2w1#
好的,我们找到了解决办法。。。
我们不知道为什么,但缺少一个主题。问题中最奇怪的是,使用zookeeper(旧式)的消费者可以使用消息。
缺少的主题是
__消费者补偿