kafka消费者使用apachecamel

brccelvz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(557)

我是新来的Apache Camel 。我们正在做poc开发Kafka消费者使用 Camel 。下面是示例代码。

context.addRoutes(new RouteBuilder(){

      @Override
        public void configure() throws Exception {
            // TODO Auto-generated method stub

         from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" 
                     + "&consumersCount={{consumer.consumersCount}}" 
                     + "&seekTo={{consumer.seekTo}}" 
                     + "&groupId={{consumer.group}}")
             .process(new Processor() {

                @Override
                public void process(Exchange exchange) throws Exception {

                    Message message = exchange.getIn();
                    Object data = message.getBody();

                    System.out.println(data);
                }
             })
             .to("seda:end");

  });

        context.start();

    ConsumerTemplate template=context.createConsumerTemplate();
    String info=template.receiveBody("seda:end",String.class);

    System.out.println(info);
}

我有以下问题:
上下文在启动后立即停止。
如果我使用使用者模板轮询到端点,它不会打印任何内容,而在.process()中,我可以在无限循环中启动上下文时打印kafka消息。为什么消费者模板无法打印。

vtwuwzda

vtwuwzda1#

正如克劳斯已经说过的,您的camel上下文将立即关闭,因为它没有阻塞。请看他评论中的链接。
我想你错过了一次机会 template.start(); 开始你的消费。有关示例,请参见此链接。

相关问题