在apache camel的.to()方法中使用exchange消息

bvuwiixz  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(670)

我是新来 Camel ,想改变我的路线动态根据一些逻辑预先制定的手

camelContext.addRoutes(new RouteBuilder() {
        public void configure() {
            PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
            pc.setLocation("classpath:application.properties");

            log.info("About to start route: Kafka Server -> Log ");

            from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}"
                    + "&valueDeserializer=" + BytesDeserializer.class.getName())
                    .routeId("FromKafka")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println(" message: " + exchange.getIn().getBody());
                            Bytes body = exchange.getIn().getBody(Bytes.class);
                            HashMap data = (HashMap)SerializationUtils.deserialize(body.get());
                            // do some work on data;
                            Map messageBusDetails = new HashMap();
                            messageBusDetails.put("topicName", "someTopic");
                            messageBusDetails.put("producerOption", "bla");
                            exchange.getOut().setHeader("kafka", messageBusDetails);
                            exchange.getOut().setBody(SerializationUtils.serialize(data));
                        }
                    }).choice()
                        .when(header("kafka"))
                            .to("kafka:"+**getHeader("kafka").get("topicName")**)
                .log("${body}");
        }
    });

getheader(“kafka”).get(“主题名”)
这就是我想要达到的目标。
但是我不知道如何访问.to()中的headers值(这是一个Map,因为kafka生产者可能有更多配置)
我知道我可能用错了。。。但这就是我一直想明白的。。。
主要目标是有多个消息总线作为.from()和.to()中的多个消息总线选项,这些选项将通过外部源(如配置文件)来决定,这样相同的路由将应用于许多逻辑场景,我认为choice()方法是最好的答案,谢谢!

lbsnaicq

lbsnaicq1#

您可以使用tod()来代替to(),tod()是“dynamic to”,用于查看详细信息
有关用于拉入各种标题等的语法,请参见简单表达式页面

相关问题