apache camel apache kafka集成

disho6za  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(610)

我正在学习如何将kafka与apachecamel集成,遇到了以下错误。任何帮助都将不胜感激。我在c:/inbox文件夹中创建了一个文件,并希望使用kafka consumer使用其中的文本。我使用的是apache camel的3.1.0版本。下面是我的代码

package com.javainuse;

import org.apache.camel.builder.RouteBuilder;

public class SimpleRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

    String topicName = "test123";
    String kafkaServer = "kafka:localhost:9092";
    String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
    String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
    String toKafka = "kafka:localhost:9092;kafka:test123?brokers=localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";

    from("file:C:/inbox?noop=true").to(toKafka);
}
}

下面是我得到的错误

SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.camel.FailedToStartRouteException: Failed to start route route1 because of Route(route1)[From[file:C:/inbox?noop=true] -> [To[kafka:loc...
    at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:133)
    at org.apache.camel.impl.engine.AbstractCamelContext.doWarmUpRoutes(AbstractCamelContext.java:3246)
    at org.apache.camel.impl.engine.AbstractCamelContext.safelyStartRouteServices(AbstractCamelContext.java:3139)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartOrResumeRoutes(AbstractCamelContext.java:2925)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2725)
    at org.apache.camel.impl.engine.AbstractCamelContext.lambda$doStart$2(AbstractCamelContext.java:2527)
    at org.apache.camel.impl.engine.AbstractCamelContext.doWithDefinedClassLoader(AbstractCamelContext.java:2544)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2525)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2421)
    at com.javainuse.MainApp.main(MainApp.java:12)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:52)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.channel.DefaultChannel.doStart(DefaultChannel.java:144)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:73)
    at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:78)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.impl.engine.BaseRouteService.startChildService(BaseRouteService.java:339)
    at org.apache.camel.impl.engine.BaseRouteService.doWarmUp(BaseRouteService.java:189)
    at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:131)
    ... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:119)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1455)
    at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1391)
    at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:240)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1454)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
    ... 25 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
    ... 37 more

Process finished with exit code 0```
nx7onnlm

nx7onnlm1#

错误stacktrace表示您的kafka使用者uri无效(在stacktrace的底部)。确实如此。
正确的形式是 kafka:[topicname]?[options] (查看Kafka文档)
所以当我看你的uri时,它应该是

kafka:test123?brokers=localhost:9092&groupId=group1

您的uri存在以下问题:
它包含2次 kafka:[topicname] 什么是无效的
其中一个 kafka:[topicname]kafka:[brokers] ,将其删除
分号( ; )而不是 & 分隔选项的步骤
zookeeper选项对于camel kafka的旧版本,请删除它们
顺便说一下:那条线 SLF4J: Defaulting to no-operation (NOP) logger implementation 在stacktrace的顶部,您使用了slf4j日志接口,但是您的项目中没有添加任何实现。
如果您使用maven,您可以添加以下依赖项来将slf4japi以及logback作为实现添加到您的项目中。

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
</dependency>

相关问题