我有一个路由器,使用聚合器->拆分器->重排序器EIP。当重新排序器批量将交换一个接一个地发送到处理器时,在我有一个交换异常之后,我必须将其余的消息发送到不同的队列。有可能吗?
任何示例代码块将是一个很大的帮助在这里。
我正在使用Sping Boot 与Apache Camel和Java DSL。
//在Julian的评论之后编辑
1.需要CLIENT_ACKNOWLEGEMENT,因为在处理聚合批处理期间,路由可能会崩溃。
1.你能解释一下“整批”和“只是一批”的区别吗?
1.这里是路由器实现from("inputQueue?concurrentConsumers=5&acknowledgementModeName=CLIENT_ACKNOWLEDGE") //CLIENT_ACKNOWLEDGE set .routeId("Consumer") .aggregate(header("AGG_ID"), new AggregationStrategy(){ // 1.Aggregating @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { oldExchange.getIn().setBody(oldBody + SEPARATOR + newBody); return oldExchange; } }) .completionTimeout(5000L) .completionSize(5) //2. Aggregate for 5 messages or wait till 5 seconds .split(simple("${body}")).delimiter("|~~|").parallelProcessing(false) // 3. Splitting .resequence(xpath("//*[local-name()='TimeStamp']")) // 4. resequencing based on TimeStamp in Message .batch(new BatchResequencerConfig(5, 5000L)).allowDuplicates() //5. Aggregate and resequence batch size are same .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { //processing logic :: external service call // 6. Exception conneting to external system, I must reject others messages inside the current batch and //send send them to another queue. //There will be onExeption().handle(true) defined inside the router, so that client acknowledgement send and broker removed the message from queue. } }) .to("outputQueue");
所以主要意图是停止处理一批5条消息的下一条消息,当任何消息都有异常时。如果路由崩溃,则所有消息必须在输入队列中可用。
1条答案
按热度按时间cclgggtu1#
如果我很好地理解了您的需求,您希望您的
resequencer
将交换定向到某个Processor
,但如果发生异常,您希望将消息传递到不同的端点。如果不确切地知道您的需求是什么,就很难给予代码示例。
rest of the messages
的定义是什么?你指的是整批还是整批?无论如何,解决这个问题的方法是相同的,但根据您的确切需求会有不同的口味。你可以实现一个singleton来存储路径,通常是
direct:default
,但当发生错误时,将路径切换到direct:error
。在resequencer
的出口处有一个处理器,它将根据存储在单例中的当前路径路由消息。枚举是一个很好的单例实现,所以这里有一种方法:所以,在你的主要路线,你会有这样的东西:
然后在错误处理部分将路径设置为
direct:error
:当然,当一个新的聚合完成时,您可以
swithToDefault
。希望它能帮助即使我不清楚在所有什么
CLIENT_ACKNOWLEDGE
意味着从你的问题描述。