Apache Camel 路由,要转发值

kmb7vmvb  于 2023-03-18  发布在  Apache
关注(0)|答案(1)|浏览(144)

我有这个apache camel路由,我示例化了一个bean,并对传入的字符串做了一些转换,但是当我把它发送到.to(Endpoints.SEDA_SEND_PRICING_LIFE_CYCLE_MESSAGE)时,它不接受转换后的值。它发送在路由开始时收到的原始值。在我的.bean类中,我返回了一个对象值。(我也尝试过返回字符串值)

from(azureServicebus(AZvalue)
                       .connectionString(connectionString)
                       .receiverAsyncClient(serviceBusReceiverAsyncClient)
                       .serviceBusReceiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                       .serviceBusType( ServiceBusType.topic)
                       .prefetchCount(100)
                       .consumerOperation ( ServiceBusConsumerOperationDefinition.receiveMessages )

                       //.maxAutoLockRenewDuration(Duration.ofMinutes(10))
                         )
                       .messageHistory()
                       // Route Name
                       .routeId(Endpoints.SEDA_PROCESS_SB_MESSAGE_ENDPOINT )
                       // multicast may not need
                       .multicast()
                       .parallelProcessing() // create parallel threads
                      .bean(PricingLifeCyclebService.class, "paraMap" ) 
                      .log("Final :- ${body}")

                      .to(Endpoints.SEDA_SEND_PRICING_LIFE_CYCLE_MESSAGE)

                       .end();
 }

下面是我的bean类

package com.sams.pricing.lifecycle.processor.services;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.sams.pricing.lifecycle.processor.dtos.LifecycleDTO;
import com.sams.pricing.lifecycle.processor.mapper.LifecycleDtoMapper;
import com.sams.pricing.lifecycle.processor.model.LifeCycleTopicC;
import com.sams.pricing.lifecycle.processor.queueDtos.ParaDTO;
import com.sams.pricing.lifecycle.processor.util.LifeCycleMapper;
import com.sams.pricing.lifecycle.processor.util.LifeCycleUtility;

import lombok.extern.slf4j.Slf4j;

@Slf4j 
@Component
public class PricingLifeCyclebService {

    @Autowired private LifecycleDtoMapper paraToLifecleDTOMapper;
    
    @Autowired private LifeCycleMapper lifecycleTopicmapper ;
    
    public LifeCycleTopicC paraMap(String object) throws  JsonProcessingException  
    { 
        final var paraMapper =   LifeCycleUtility.getObjectMapper();
        final var paraDTO = paraMapper.readValue(object, ParaDTO.class);
        
        LifecycleDTO lifecycleDTO = paraToLifecleDTOMapper.mapToLifecycleDTO(paraDTO);
        
        LifeCycleTopicC obj = lifecycleTopicmapper.toLifeCycleTopicC(lifecycleDTO);
        
        
        log.info("Event={}, Body={}", "ConvertParatoTopic", obj);
        return obj;
        
    }

}
wnavrhmk

wnavrhmk1#

这是因为您在多播块内转换了正文,而您应该在多播块之前转换正文。
在下面的例子中,通过direct:example route接收到相同的消息,同时发送到端点1、端点2和端点3。

from("direct:example")
    .routeId("example")
    .multicast().parallelProcessing()
        .bean(HelloBean.class, "greet" ) // endpoint 1
        .log("Final :- ${body}") // endpoint 2
        .to("direct:printBody") // endpoint 3
    .end();

下面是使用多播的方法:使用bean转换消息,并将转换后的消息同时发送到端点direct:multicastAdirect:multicastBdirect:multicastC

from("direct:example3")
    .routeId("example3")
    .bean(HelloBean.class, "greet" ) 
    .log("Final :- ${body}") 
    .multicast().parallelProcessing()
        .to("direct:multicastA") 
        .to("direct:multicastB") 
        .to("direct:multicastC") 
    .end();

from("direct:multicastA")
    .delay(1000)
    .log("A: ${body}")
;

from("direct:multicastB")
    .delay(1000)
    .log("A: ${body}")
;

from("direct:multicastC")
    .delay(1000)
    .log("A: ${body}")
;

Multicast是默认pipeline行为的替代。区别在于管道按顺序调用端点,多播向所有列出的端点发送相同的消息(.Bean.log都是端点,就像direct、seda和timer一样)。

相关问题