SpringCloud流kafka通道在SpringBoot应用程序中不工作

q9yhzks0  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(338)

我一直在尝试让入站subscribablechannel和出站messagechannel在我的spring boot应用程序中工作。
我已经成功地设置了Kafka频道,并成功地进行了测试。
此外,我还创建了一个基本的spring引导应用程序,用于测试从通道添加和接收内容。
我遇到的问题是,当我将等效代码放在它所属的应用程序中时,消息似乎永远不会被发送或接收。通过调试很难确定发生了什么,但唯一不同的是频道名。在工作impl中,频道名称类似于非工作应用程序中的application.channellocalhost:8080/channel.
我想知道是否有一些spring引导配置阻止或改变了通道的创建到一个不同的通道源?
有人有类似的问题吗?
应用程序.yml

spring:
  datasource:
    url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    platform: h2
    username: hello
    password: 
    driverClassName: org.h2.Driver    
  jpa:
    properties:
      hibernate:
        show_sql: true
        use_sql_comments: true
        format_sql: true

  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
          contentType: application/json
        email-out:
          destination: email
          contentType: application/json

电子邮件

public class Email {

    private long timestamp;

    private String message;

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

}

绑定配置

@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {    
}

接口

public interface EmailQueues {

    String INPUT = "email-in";
    String OUTPUT = "email-out";

    @Input(INPUT)
    SubscribableChannel inboundEmails();

    @Output(OUTPUT)
    MessageChannel outboundEmails();
}

控制器

@RestController
    @RequestMapping("/queue")
    public class EmailQueueController {

        private EmailQueues emailQueues;

        @Autowired
        public EmailQueueController(EmailQueues emailQueues) {
            this.emailQueues = emailQueues;
        }

        @RequestMapping(value = "sendEmail", method = POST)
        @ResponseStatus(ACCEPTED)
        public void sendToQueue() {
            MessageChannel messageChannel = emailQueues.outboundEmails();
            Email email = new Email();
            email.setMessage("hello world: " + System.currentTimeMillis());
            email.setTimestamp(System.currentTimeMillis());

            messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());

        }

        @StreamListener(EmailQueues.INPUT)
        public void handleEmail(@Payload Email email) {
            System.out.println("received: " + email.getMessage());
        }
    }

我不确定是否有一个继承的配置项目使用springcloud,springcloudsleuth可能会阻止它工作,但即使我删除了它,仍然没有。但与我的应用程序不同的是,我从未看到配置consumeconfig,例如:

o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = consumer-2
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true

(这个配置是我在运行上述代码时在我的基本spring引导应用程序中看到的,代码可以从kafka通道进行读写)。。。。
我假设我正在使用的一个库中有一些over-spring引导配置创建一个不同类型的通道,但我找不到该配置是什么。

shstlldc

shstlldc1#

您发布的内容包含许多不相关的配置,因此很难确定是否有任何东西妨碍了您。另外,当你说“..似乎消息永远不会被发送或接收..”时,日志中是否有异常?另外,请说明您使用的Kafka版本以及SpringCloudStream。现在,我确实尝试根据您的代码复制它(在清理了一点只留下相关部分之后),并且能够成功地发送/接收。
我的Kafka版本是0.11和SpringCloudStream2.0.0。以下是相关代码:

spring:   
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
        email-out:
          destination: email

@SpringBootApplication
@EnableBinding(KafkaQuestionSoApplication.EmailQueues.class)
public class KafkaQuestionSoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaQuestionSoApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(EmailQueues emailQueues) {
        return new ApplicationRunner() {
            @Override
            public void run(ApplicationArguments args) throws Exception {
                emailQueues.outboundEmails().send(new GenericMessage<String>("Hello"));
            }
        };
    }

    @StreamListener(EmailQueues.INPUT)
    public void handleEmail(String payload) {
        System.out.println("received: " + payload);
    }

    public interface EmailQueues {
        String INPUT = "email-in";
        String OUTPUT = "email-out";

        @Input(INPUT)
        SubscribableChannel inboundEmails();

        @Output(OUTPUT)
        MessageChannel outboundEmails();
    }
}
ymdaylpp

ymdaylpp2#

好的,经过大量的调试。。。我发现有些东西正在创建一个测试支持绑定器(怎么还不知道),所以很明显,这是用来不影响将消息添加到真实通道的。
添加后

@SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)

Kafka频道配置已经运行,消息正在添加。。很想知道到底是什么在设置这个测试支持绑定器。。我最终会找到那个笨蛋的。

相关问题