使用camel kafka的producertemplate sendbody()方法时未能为终结点创建生产者

dphi5xsq  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(584)

我正在测试SimpleProducer,以便使用ApacheCamel在kafka(0.8.2.1)上发送消息。我已经在camel中使用javadsl创建了端点。

CamelContext ctx =new DefaultCamelContext();
PropertiesComponent properties=new PropertiesComponent();

properties.setLocation("com/camel/test/props.properties");
ctx.addComponent("properties",properties);

final String uri= "kafka://{{kafka.host}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";
String uriParams = "&metadata.broker.list={{metadata.broker.list}";

ctx.addRoutes(new RouteBuilder() {
    public void configure() { //
        from(uri+"&groupId={{groupId}}")
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange.getIn().getBody());
            }
        })
        ;
    }
});

ctx.start();

ProducerTemplate tmp = ctx.createProducerTemplate();
tmp.sendBody(ctx.getEndpoint(uri), "my test is working");// Error occurs here

现在我想用apachecamel提供的producertempalte在kafka上发送消息。但是我在运行程序时遇到以下错误注意:zookeeper和kafka启动,可以使用kafka控制台生成/使用消息。

Exception in thread "main" org.apache.camel.FailedToCreateProducerException: Failed to create Producer for endpoint: Endpoint[kafka://localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181]. Reason: java.lang.NullPointerException
    at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:407)
    at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:220)
    at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:343)
    at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
    at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
    at com.camel.test.CamelTest.main(CamelTest.java:45)
Caused by: java.lang.NullPointerException
    at java.util.Hashtable.put(Hashtable.java:514)
    at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:54)
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:61)
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
    at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:2869)
    at org.apache.camel.impl.DefaultCamelContext.doAddService(DefaultCamelContext.java:1097)
    at org.apache.camel.impl.DefaultCamelContext.addService(DefaultCamelContext.java:1058)
    at org.apache.camel.impl.ProducerCache.doGetProducer(ProducerCache.java:405)
    ... 6 more

我猜属性没有为producer设置,但是不知道如何在producer模板中设置。

yizd12fk

yizd12fk1#

我通过调试找到了解决办法。默认情况下,producertemplate需要在创建新对象时未设置的默认参数(这可能是api中的错误)。所以我找到了一种通过uri发送参数的方法。其中以下参数是必需的
metadata.broker.list(作为uri参数)
request.required.acks(已默认设置)
producer.type(本例中不需要,但其他API需要)
serializer.class(已默认设置)
分区器(类)(作为uri参数)
分区密钥(作为标头)
我们没有为分区密钥发送param的选项,所以需要将其添加到头中。所以使用sendbodyandheader方法来发送生产者消息。

ProducerTemplate tmp = ctx.createProducerTemplate();

        tmp.setDefaultEndpoint(ctx.getEndpoint(uri+"&partitioner={{partitioner.class}}"));
        ctx.start();
        tmp.sendBodyAndHeader("my test is working "+(new Random()).nextInt(100), KafkaConstants.PARTITION_KEY, 1);
        tmp.stop();
        ctx.stop();
n3schb8v

n3schb8v2#

uri应该有代理列表作为服务器名(不要因为我没有创建这个组件的语法而责怪我)。

final String uri= "kafka://{{metadata.broker.list}}?topic={{topic}}&zookeeperHost={{zookeeperHost}}&zookeeperPort={{zookeeperPort}}";

相关问题