我正在学习如何将kafka与apachecamel集成,遇到了以下错误。任何帮助都将不胜感激。我在c:/inbox文件夹中创建了一个文件,并希望使用kafka consumer使用其中的文本。我使用的是apache camel的3.1.0版本。下面是我的代码
package com.javainuse;
import org.apache.camel.builder.RouteBuilder;
public class SimpleRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
String topicName = "test123";
String kafkaServer = "kafka:localhost:9092";
String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
String toKafka = "kafka:localhost:9092;kafka:test123?brokers=localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";
from("file:C:/inbox?noop=true").to(toKafka);
}
}
下面是我得到的错误
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.camel.FailedToStartRouteException: Failed to start route route1 because of Route(route1)[From[file:C:/inbox?noop=true] -> [To[kafka:loc...
at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:133)
at org.apache.camel.impl.engine.AbstractCamelContext.doWarmUpRoutes(AbstractCamelContext.java:3246)
at org.apache.camel.impl.engine.AbstractCamelContext.safelyStartRouteServices(AbstractCamelContext.java:3139)
at org.apache.camel.impl.engine.AbstractCamelContext.doStartOrResumeRoutes(AbstractCamelContext.java:2925)
at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2725)
at org.apache.camel.impl.engine.AbstractCamelContext.lambda$doStart$2(AbstractCamelContext.java:2527)
at org.apache.camel.impl.engine.AbstractCamelContext.doWithDefinedClassLoader(AbstractCamelContext.java:2544)
at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2525)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2421)
at com.javainuse.MainApp.main(MainApp.java:12)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:52)
at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
at org.apache.camel.processor.channel.DefaultChannel.doStart(DefaultChannel.java:144)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:73)
at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:78)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.impl.engine.BaseRouteService.startChildService(BaseRouteService.java:339)
at org.apache.camel.impl.engine.BaseRouteService.doWarmUp(BaseRouteService.java:189)
at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:131)
... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:119)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1455)
at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1391)
at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:240)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1454)
at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
... 25 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
... 37 more
Process finished with exit code 0```
1条答案
按热度按时间nx7onnlm1#
错误stacktrace表示您的kafka使用者uri无效(在stacktrace的底部)。确实如此。
正确的形式是
kafka:[topicname]?[options]
(查看Kafka文档)所以当我看你的uri时,它应该是
您的uri存在以下问题:
它包含2次
kafka:[topicname]
什么是无效的其中一个
kafka:[topicname]
是kafka:[brokers]
,将其删除分号(
;
)而不是&
分隔选项的步骤zookeeper选项对于camel kafka的旧版本,请删除它们
顺便说一下:那条线
SLF4J: Defaulting to no-operation (NOP) logger implementation
在stacktrace的顶部,您使用了slf4j日志接口,但是您的项目中没有添加任何实现。如果您使用maven,您可以添加以下依赖项来将slf4japi以及logback作为实现添加到您的项目中。