我是Camel的新手,我的用例如下:
- 我们从AMQ接收消息,我们希望重新Map此消息,并将此消息发送到客户的不同端点
- 每个客户都有包含哪些字段的配置,以及OAuth的URL+发送消息的URL(REST API)+凭据
- 客户被分组在代理下,一个代理可以支配多个客户。我们在Map中有配置,以“agentId”为键,以“customerConfigs”为值。
- 通过消息中的一个字段,我们决定该消息应该发送到哪个代理
- 然后,我们迭代该代理下的所有客户,检查每个客户需要哪些字段,并相应地重新Map消息
- 我们还通过检查消息内容是否符合客户的标准来进行过滤。如果符合,我们将根据客户的OAuth url进行OAuth,并将消息发送给他们。如果不符合,则跳过。
我们正在使用Camel来实现这一点,到目前为止,从接收到Map和检索配置等所有步骤都在bean.(.bean(GeneralBean.class)
)中定义。
但现在,我们希望针对客户端点重试,我决定将步骤分为几个Camel步骤,因为我不想像现在这样重试整个接收/重新Map/检索配置。我只想重试最后一步,即发送。
现在问题来了:我应该使用哪个Camel组件?
1.我觉得recipient list不错,但不确定怎么样。也许“动态路由器”更好?
1.在定义步骤时,当我检索每个客户的配置时,交换主体中的一个对象(我们称之为RemappedMessage
)变为2(RemappedMessage
和CustomerConfig
列表)。它们具有一对多关系。我如何将这两个对象传递给下一个bean?或者我应该在一个bean中一起处理它们?在Exchange
中?在@ExchangeProperties Map<String, Object> properties
中?后一种方法可以用,但是IMO不是很驼。或者定义一个元组类来合并它们?我经常用它,但是觉得很丑。
1.我不认为Camel中有一些语法可以获取Exchange中对象的一些属性,并将其作为url和基本凭据用户名和密码放入to()
中?
总的来说,我想把流程分成几个步骤在一个Camel管道中,但不确定如何处理“一个对象拆分成多个对象并且它们需要齐头并进到下游”的问题。
我用的不是Spring,而是Quarkus。
现在,我与:
from("activemq:queue:" + appConfig.getQueueName())
.bean(IncomingMessageConverter.class) // use class form so that Camel will cache the bean
.bean(UserIdValidator.class) // validate and if wrong, end route here
.bean(CustomerConfigRetrieverBean.class) // retrieve config of customer, by agent id. How to pass down both??
.bean(EndpointFieldsTailor.class) // remove fields if this customer is not interested. Needs CustomerConfig
.recipientList(xxxxxx) // how?
// what's next?
因为RemappedMessage
是步骤.bean(IncomingMessageConverter.class)
的返回类型,所以Camel可以绑定参数,这样我就可以访问Map的消息,但是显然我不能同时返回两个对象。
2条答案
按热度按时间exdqitrt1#
当您要将同一消息发送到多个端点,并且在进入收件人列表之前知道这些端点是什么时,收件人列表是理想的选择。
动态路由器可以将消息路由到许多端点,在输入路由器时不一定知道这些端点的列表和顺序。由于您有一对多的情况,动态路由器可能更适合。
一个可能有效的简单方法是准备一个元组列表。每个元组将包含一个
CustomerConfig
和一个RemappedMessage
。然后,您将split列表,并在拆分器中将消息发送到代理。对于元组,您可以使用ImmutablePair、Map
或仅两个元素List
。至于设置url和用户名/密码,我假设您使用的是Camel的HTTP组件。由于http组件允许这样做,所以最好将这些值作为头提供。可以使用CamelHttpUri头设置URL。HTTP组件通常将消息头作为HTTP头传递,因此您可以通过设置同名的消息头来设置
Authorization
头等内容。当然,它也支持从交换和消息中的值设置头。
实际上,你可能需要做更多的工作来授权。例如,如果是基本授权,你需要计算你想要使用的值。我会在头文件或属性中设置它,然后引用它如下:
cgyqldqp2#
最后,路径看起来非常复杂,但每个bean只做一件事。
对于API配置:
我发现
choice().when().endchoice().otherwise().endchoice().end()
在每个分支中都需要一个to()
,而我不能使用when().setHeader().endchoice()
,所以最后我决定将所有这些逻辑放到一个bean中。并且必须使用
end()
来结束choice()
,不能使用endchoice()
;endchoice()
是针对when()
和otherwise()
的。相当误导。型
我以为
endchoice()
和choice()
是一个级别的,为什么不叫endbranch()
呢?