Camel 使用哪个EIP:通过消息内容、Map、过滤和OAuth检索配置,然后发送,仅重试发送部分

vc6uscn9  于 2022-12-18  发布在  Apache
关注(0)|答案(2)|浏览(151)

我是Camel的新手,我的用例如下:

  • 我们从AMQ接收消息,我们希望重新Map此消息,并将此消息发送到客户的不同端点
  • 每个客户都有包含哪些字段的配置,以及OAuth的URL+发送消息的URL(REST API)+凭据
  • 客户被分组在代理下,一个代理可以支配多个客户。我们在Map中有配置,以“agentId”为键,以“customerConfigs”为值。
  • 通过消息中的一个字段,我们决定该消息应该发送到哪个代理
  • 然后,我们迭代该代理下的所有客户,检查每个客户需要哪些字段,并相应地重新Map消息
  • 我们还通过检查消息内容是否符合客户的标准来进行过滤。如果符合,我们将根据客户的OAuth url进行OAuth,并将消息发送给他们。如果不符合,则跳过。

我们正在使用Camel来实现这一点,到目前为止,从接收到Map和检索配置等所有步骤都在bean.(.bean(GeneralBean.class))中定义。
但现在,我们希望针对客户端点重试,我决定将步骤分为几个Camel步骤,因为我不想像现在这样重试整个接收/重新Map/检索配置。我只想重试最后一步,即发送。
现在问题来了:我应该使用哪个Camel组件?
1.我觉得recipient list不错,但不确定怎么样。也许“动态路由器”更好?
1.在定义步骤时,当我检索每个客户的配置时,交换主体中的一个对象(我们称之为RemappedMessage)变为2(RemappedMessageCustomerConfig列表)。它们具有一对多关系。我如何将这两个对象传递给下一个bean?或者我应该在一个bean中一起处理它们?在Exchange中?在@ExchangeProperties Map<String, Object> properties中?后一种方法可以用,但是IMO不是很驼。或者定义一个元组类来合并它们?我经常用它,但是觉得很丑。
1.我不认为Camel中有一些语法可以获取Exchange中对象的一些属性,并将其作为url和基本凭据用户名和密码放入to()中?
总的来说,我想把流程分成几个步骤在一个Camel管道中,但不确定如何处理“一个对象拆分成多个对象并且它们需要齐头并进到下游”的问题。
我用的不是Spring,而是Quarkus。
现在,我与:

from("activemq:queue:" + appConfig.getQueueName())
                .bean(IncomingMessageConverter.class) // use class form so that Camel will cache the bean
                .bean(UserIdValidator.class) // validate and if wrong, end route here
                .bean(CustomerConfigRetrieverBean.class) // retrieve config of customer, by agent id. How to pass down both??
                .bean(EndpointFieldsTailor.class) // remove fields if this customer is not interested. Needs CustomerConfig
                .recipientList(xxxxxx) // how?
                // what's next?

因为RemappedMessage是步骤.bean(IncomingMessageConverter.class)的返回类型,所以Camel可以绑定参数,这样我就可以访问Map的消息,但是显然我不能同时返回两个对象。

exdqitrt

exdqitrt1#

当您要将同一消息发送到多个端点,并且在进入收件人列表之前知道这些端点是什么时,收件人列表是理想的选择。
动态路由器可以将消息路由到许多端点,在输入路由器时不一定知道这些端点的列表和顺序。由于您有一对多的情况,动态路由器可能更适合。
一个可能有效的简单方法是准备一个元组列表。每个元组将包含一个CustomerConfig和一个RemappedMessage。然后,您将split列表,并在拆分器中将消息发送到代理。对于元组,您可以使用ImmutablePairMap或仅两个元素List
至于设置url和用户名/密码,我假设您使用的是Camel的HTTP组件。由于http组件允许这样做,所以最好将这些值作为头提供。可以使用CamelHttpUri头设置URL。HTTP组件通常将消息头作为HTTP头传递,因此您可以通过设置同名的消息头来设置Authorization头等内容。
当然,它也支持从交换和消息中的值设置头。

// assumes the body is a List in which the first element is a CustomerConfig
.setHeader("Authorization", simple("${body[0].authValue}"))

实际上,你可能需要做更多的工作来授权。例如,如果是基本授权,你需要计算你想要使用的值。我会在头文件或属性中设置它,然后引用它如下:

.setHeader("Authorization", simple("${header.basicAuthValue}"))
// or
.setHeader("Authorization", simple("${exchangeProperty.basicAuthValue}"))
cgyqldqp

cgyqldqp2#

最后,路径看起来非常复杂,但每个bean只做一件事。

// main route, split to direct:split root and return aggregated list of EventResponse
from("activemq:queue:" + getQueue())
        // note here unmarshall is manual but also use Jsonb
        .bean(EventUnmarshaller.class) // use class form so that Camel will cache the bean
            .setProperty("skipAck", simple("${body.skipAck}"))
        .bean(SomeFieldValidator.class)
        .bean(ConfigDetailsEnricher.class)
        .split(body(), (oldExchange, newExchange) -> { // reduce each new EventResponse into the result list
            if (oldExchange == null) {
                // the first time we aggregate we only have the new exchange,
                // so we just return it
                return newExchange;
            }
            List<EventResponse> list = oldExchange.getIn().getBody(List.class);
            EventResponse newElement = newExchange.getIn().getBody(EventResponse.class);
            list.add(newElement);
            oldExchange.getIn().setBody(list);
            return oldExchange;
        })
            .to("direct:individual")
        .end() // end split
.to("direct:replyRoute");

// split route, all beans here only deals with one EventSpecificConfig; after all convert to EventResponse
from("direct:individual")
        .filter().method(CriteriaFilter.class)
        .bean(AnotherFieldsTailor.class)
        .bean(RestOperationEnricher.class)
        .choice()
            .when(simple("${body.restOperation.name} != 'DELETE'"))
            .bean(NonDeleteOperationEventFieldsTailor.class)
        .end()
        .bean(HttpMethodEnricher.class)
        .bean(TokenRetriever.class) // call oauth endpoint; as here a cache is used for token so have to use a bean. Set token to EventSpecificConfig
        // inject url into header here; as when reaching "toD()", the body already has change to type Event, not EventSpecificConfig
        .bean(EventApiConfiguration.class) // after this step, body is already new event(PUT, POST) / null(DELETE)
        .choice()
            .when(simple("${body} != null"))
                .marshal().json(JsonLibrary.Jsonb)
            .endChoice()
        .end()
        .toD("${header.url}", 10) // only cache at most 10 urls
        ;

from("direct:replyRoute") // list of EventResponse
        .bean(EventResponsesProcessor.class)
        .log(LoggingLevel.DEBUG, "Reply: " + simple("${body}"))
        .setHeader("from", constant("MYAPP"));

对于API配置:

@Handler
void configure(EventSpecificConfig eventSpecificConfig, Exchange exchange) {
    ConfigDetail configDetail = eventSpecificConfig.getConfigDetail();
    String httpMethodUpperCase = eventSpecificConfig.getHttpMethod().getMethodName();
    SubscriptionEvent newEvent = eventSpecificConfig.getNewEvent();
    Message message = exchange.getIn();
    message.setHeader("Authorization", "Bearer " + eventSpecificConfig.getToken());
    message.setHeader(Exchange.HTTP_METHOD, httpMethodUpperCase);
    message.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
    if (HTTP_METHODS_WITH_PATH.contains(httpMethodUpperCase)) {
        message.setHeader(Exchange.HTTP_PATH, "/" + newEvent.getImsi());
    }
    if (HTTP_METHODS_WITH_BODY.contains(httpMethodUpperCase)) {
        message.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
        message.setBody(newEvent);
    } else { // DELETE
        message.setBody(null); // DELETE has no body
        // more logic of setting headers for HTTP query params depending on config values
    }
    message.setHeader("url", configDetail.url()); // toD() has no access to EventSpecificConfig, only header works
}

我发现choice().when().endchoice().otherwise().endchoice().end()在每个分支中都需要一个to(),而我不能使用when().setHeader().endchoice(),所以最后我决定将所有这些逻辑放到一个bean中。
并且必须使用end()来结束choice(),不能使用endchoice(); endchoice()是针对when()otherwise()的。相当误导。

choice()
    .when(simple("${header.foo} == 'bar'"))
        .to("x")
    .endchoice()
    .otherwise()
        .to("y")
    .endchoice()
.end()


我以为endchoice()choice()是一个级别的,为什么不叫endbranch()呢?

相关问题