我是新来 Camel ,想改变我的路线动态根据一些逻辑预先制定的手
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");
log.info("About to start route: Kafka Server -> Log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}"
+ "&valueDeserializer=" + BytesDeserializer.class.getName())
.routeId("FromKafka")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(" message: " + exchange.getIn().getBody());
Bytes body = exchange.getIn().getBody(Bytes.class);
HashMap data = (HashMap)SerializationUtils.deserialize(body.get());
// do some work on data;
Map messageBusDetails = new HashMap();
messageBusDetails.put("topicName", "someTopic");
messageBusDetails.put("producerOption", "bla");
exchange.getOut().setHeader("kafka", messageBusDetails);
exchange.getOut().setBody(SerializationUtils.serialize(data));
}
}).choice()
.when(header("kafka"))
.to("kafka:"+**getHeader("kafka").get("topicName")**)
.log("${body}");
}
});
getheader(“kafka”).get(“主题名”)
这就是我想要达到的目标。
但是我不知道如何访问.to()中的headers值(这是一个Map,因为kafka生产者可能有更多配置)
我知道我可能用错了。。。但这就是我一直想明白的。。。
主要目标是有多个消息总线作为.from()和.to()中的多个消息总线选项,这些选项将通过外部源(如配置文件)来决定,这样相同的路由将应用于许多逻辑场景,我认为choice()方法是最好的答案,谢谢!
1条答案
按热度按时间lbsnaicq1#
您可以使用tod()来代替to(),tod()是“dynamic to”,用于查看详细信息
有关用于拉入各种标题等的语法,请参见简单表达式页面