camel kafka组件不工作错误:“因为必须配置代理”

j2datikz  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(372)

使用apache camel的kafka组件(版本2.19.1)时出错,我正在尝试打印主题中的传入消息,我的管道是这样组成的:

...
context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("kafka://localhost:9092?topic=test&groupId=testing")
               .to("stream:out");
  context.start();
    }
}

在终结点中尝试使用“/”和不使用“/”。
我得到的是:

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka://localhost:9092?topic=test&groupI... because of Brokers must be configured
at org.apache.camel.impl.RouteService.warmUp(RouteService.java:147)
at org.apache.camel.impl.DefaultCamelContext.doWarmUpRoutes(DefaultCamelContext.java:3762)
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3669)
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3455)
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3309)
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:202)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3093)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3089)
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3112)
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3089)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3026)
at org.apache.camel.MainApp.main(MainApp.java:60)
Caused by: java.lang.IllegalArgumentException: Brokers must be configured
at org.apache.camel.component.kafka.KafkaConsumer.<init>(KafkaConsumer.java:62)
at org.apache.camel.component.kafka.KafkaEndpoint.createConsumer(KafkaEndpoint.java:76)
at org.apache.camel.impl.EventDrivenConsumerRoute.addServices(EventDrivenConsumerRoute.java:69)
at org.apache.camel.impl.DefaultRoute.onStartingServices(DefaultRoute.java:103)
at org.apache.camel.impl.RouteService.doWarmUp(RouteService.java:172)
at org.apache.camel.impl.RouteService.warmUp(RouteService.java:145)
... 12 more

Process finished with exit code 1

我正试图找出它,但我真的不明白是什么问题,我的kafka集群是一个单一的代理,所有的都是启动和运行(zookeeper和服务器),需要帮助

yuvru6vn

yuvru6vn1#

添加 brokers=localhost:9092 到使用者uri。

ruoxqz4g

ruoxqz4g2#

看看这个例子,url的第一部分是 topic 然后作为参数传递 brokers . 所以官方文件对我来说似乎有点误导。

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}")
                    .routeId("FromKafka")
                .log("${body}");

但是作为一般建议:camel是开源的,所以您可以随时查看github上的代码和示例。您还可以找到您在那里发布的堆栈跟踪的那些行,然后跟踪丢失的内容。

相关问题