这是我的路线生成器。在这里,我尝试将数据从我的文件插入到主题中。稍后,我将传递main方法并使用camel上下文运行它。我试过好几种密码,但都没用。我在ApacheKafka Camel 的poc工作。
public class SimpleRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
String topicName = "test120";
String kafkaServer = "kafka:localhost:9092";
String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
String toKafka = "kafka:localhost:9092?topic=test120;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";
// toKafka = new StringBuilder().append("&").append(serializerClass).toString();
/*new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
.append(zooKeeperHost).append("&").append(serializerClass).toString();*/
from("file:C:/inbox?noop=true").to(toKafka);
}
}
这是我的pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>HelloWorld</groupId>
<artifactId>Pallavi</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.20.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.20.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
</project>
这是我的主要课程:
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
public class MainApp {
public static void main(String[] args) {
SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder();
CamelContext ctx = new DefaultCamelContext();
try {
ctx.addRoutes(routeBuilder);
ctx.start();
Thread.sleep(5 * 60 * 1000);
ctx.stop();
System.out.println("hi i am working");
}
catch (Exception e) {
e.printStackTrace();
}
}
}
错误是:
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:459)
at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:63)
at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:89)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:75)
at org.apache.camel.impl.DeferServiceStartupListener.onCamelContextStarted(DeferServiceStartupListener.java:49)
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3859)
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3638)
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3490)
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:208)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3249)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3245)
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3268)
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3245)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3168)
at demo.MainApp.main(MainApp.java:13)
Picked up _JAVA_OPTIONS: -Xmx512M -Xms512M
4条答案
按热度按时间zrfyljdw1#
可能您还没有设置为kafka producer提供配置的producer属性。
camel的kafka组件需要配置强制的kafka生产者属性。
qmelpv7a2#
您需要配置
brokers
端点上的选项。或者Kafka的部分。我已经记录了一个问题,以便在下一个版本中使camel report成为一个更好的例外:https://issues.apache.org/jira/browse/camel-12090
这里也有一个Kafka的例子:https://github.com/apache/camel/tree/master/examples/camel-example-kafka
z9zf31ra3#
确保c:\inbox目录确实存在。另外,请确保将pom文件中依赖项的版本更改为教程中提到的版本,因为camel kafka 2.17.0之后情况发生了变化,它不再接受tokafka字符串中的zookeeperhost和zookeeperport值。
v8wbuo2f4#
我也学过同样的教程。但最初,我无法成功运行(谁使用最新版本的Kafka服务器)。我得到了以下异常,而不是空点异常:
正如克劳斯·易卜生所说,你需要加上
经纪人
代码示例
我希望这将有助于谁遵循教程。