spring云流-无法配置kafka的代理地址

mf98qq94  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(355)

我正在尝试使用spring cloud stream手动注册kafka侦听器,但是在尝试连接到broker时遇到了一些问题:

[Consumer clientId=consumer-1, groupId=h2r] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request
[Consumer clientId=consumer-1, groupId=h2r] Initiating connection to node localhost:9092 (id: -1 rack: null)
[Consumer clientId=consumer-1, groupId=h2r] Node -1 disconnected.
[Consumer clientId=consumer-1, groupId=h2r] Connection to node -1 could not be established. Broker may not be available.
[Consumer clientId=consumer-1, groupId=h2r] Give up sending metadata request since no node is available

它正在尝试连接localhost:9092 but 我的服务器位于另一台计算机(192.168.1.200:9092)中,在该配置中我做错了什么:

@Service
public class TenantMessageConsumer {

private final String defaultEnterpriseSchema;
private final MailService mailService;
private final KafkaListenerContainerFactory containerFactory;
private final KafkaListenerEndpointRegistry registry;

public TenantMessageConsumer(String defaultEnterpriseSchema, MailService mailService, KafkaListenerContainerFactory containerFactory, KafkaListenerEndpointRegistry registry) {
    this.defaultEnterpriseSchema = defaultEnterpriseSchema;
    this.mailService = mailService;
    this.containerFactory = containerFactory;
    this.registry = registry;
    listen();
}

public void listen() {
    TenantMessageConsumer that=this;
    AbstractKafkaListenerEndpoint endpoint=new AbstractKafkaListenerEndpoint<String, Object>() {
        @Override
        protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
            try {
                return new RecordMessagingMessageListenerAdapter(that,TenantMessageConsumer.class.getMethod("process",Object.class));
            } catch (NoSuchMethodException e) {
                return null;
            }
        }
    };
    endpoint.setId("tenant");
    endpoint.setTopics(defaultEnterpriseSchema);
    endpoint.setGroupId("h2r");
    registry.registerListenerContainer(endpoint,containerFactory);
}

public void process(Object message){
    if (message instanceof SimpleEmailMessage) {
        SimpleEmailMessage emailMessage = (SimpleEmailMessage) message;
        if (emailMessage.getContent().equals("reset-password"))
            mailService.sendPasswordResetMail(emailMessage);
    }
}
}

它应该得到以下配置:

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: 192.168.1.200

所以,我需要的是一种方法来获取配置的代理地址并在endpoint对象中设置它。
重要的
由于主题名是动态的,我不能使用@streamlistener这样的注解。

sbtkgmzw

sbtkgmzw1#

您没有描述您的问题,也没有提供任何相关信息,如堆栈跟踪、日志等(请以后做),但我会努力的。
你绝对可以用 @StreamListener 以及SpringCloudStream支持动态目的地的其他注解。
如果您还需要帮助,请浏览以上部分并告诉我们。

相关问题