我们正在使用apache camel到2.17.*版本,以便使用maxpollrecords参数,我正在尝试升级到2.18.0。升级到2.18.0后,代理似乎再也无法识别消费者。下面是我尝试创建的示例消费者。我可以将消息从cli生成到主题,如果在cli中创建使用者,我可以看到在cli中创建的使用者使用消息,而不是通过apachecamel创建的使用者。
另外,使用consumer group descripe cli命令,如果只运行apachecamel消费者示例,我可以看到消费者id为空。当我使用2.17.5运行时,代理用来识别并将其分配给分区。我找不到这个例子请帮忙。
package com.test;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
public class CamelConsumer {
public static void main(String argv[]){
CamelContext camelContext = new DefaultCamelContext();
// Add route to send messages to Kafka
try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");
System.out.println("About to start route: Kafka Server -> Log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
+ "&groupId={{consumer.group}}").routeId("FromKafka")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
Object data = message.getBody();
System.out.println(data);
}
});
}
});
camelContext.start();
Thread.sleep(5 * 60 * 1000);
camelContext.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
我也没有例外。我找不到任何与此相关的文件。请帮忙。
consumer.topic=test kafka.host=localhost kafka.port=9092 consumer.maxpollrecords=1 consumer.consumerscount=1 consumer.group=test
1条答案
按热度按时间zujrkrfu1#
我可以在下面的存储库中找到2.19中的工作示例代码。https://github.com/talend/apache-camel/branches (分叉的树枝)
https://github.com/apache/camel (实际分支机构)
最后,它在2.21.5版本中工作,我不得不将ApacheKafkaMaven版本从0.9升级到1.0.0