我对 Camel 的SEDA端点有点困惑。显然有些事情我不明白,但到目前为止我还不知道是什么。
下面是我的设置(抽象):
from("direct:some-entry-point")
.setBody {
// Route sets a byte array
myByteArray
}
.to("seda:file-upload") // The message goes on to upload the byte array
.choice()
.`when` { doTheOtherThing } // Under certain circumstances, something needs to be done additionally. The body can already be uploaded, however.
.to("direct:do-the-other-thing")
from("direct:do-the-other-thing")
.setBody {
// This route adds a file handle, because this one is big and won't fit into memory
myFileHandle
}
.to("seda:streaming-upload") //Since this file is big, it is forwarded to another endpoint to be uploaded as a stream.
from("seda:streaming-upload?queue=#FiloBlockingQueue")
.process {
// Opens a stream to the filehandle in the body and uploads from that.
}
from("seda:file-upload?queue=#FiloBlockingQueue")
.process {
// Uploads the byte array in the body
}
总而言之,没有什么太复杂的我想。但我有个问题。出于某种疯狂的原因,当some-entry-point将消息发送到文件上传端点时,它最终会在流上传端点处结束,在那里它会崩溃,因为它需要一个文件句柄并得到一个字节数组。
我已经做了所有我能做的日志记录和断点,我所能告诉你的是,这确实是发生的事情。消息不会在文件上传时到达,也不会在做其他事情时到达,即使已经给出了发生这种情况的条件。我还验证了do-the-other-thing是转发到流媒体上传的唯一途径。我能解释这种行为的唯一方法是,转发到file-upload的消息最终到达了错误的端点。有人能告诉我为什么吗?
编辑:一些新的见解:将消息发送到错误队列的行为不一致。事实上,这似乎完全是随机的。我向任何SEDA队列端点发送消息,这是一个将实际接收消息的cointoss。从行为判断,我怀疑我完全误解了这里的底层架构。在我的应用程序中似乎只有一个SEDA队列示例,并且两个端点都轮询同一个示例。第一个投票的人是得到消息的人。这并不是我从“端点”中直观地期望的行为,但它可以解释很多事情。有谁认识 Camel 的能给我解释一下吗?
1条答案
按热度按时间lqfhib0f1#
事实证明,这两个端点都由同一个队列支持。我对 Camel 的工作原理了解得太少了。当向SEDA端点发送内容时,它会被放在指定的队列中,SEDA端点将以一定的间隔轮询它们的队列。如果两个端点共享同一个队列,您无法控制哪一个端点将首先接收消息。
因此,如果你想要两个不同的端点,* 是 * 实际上独立的端点,你需要给予它们自己的队列,方法是为它们创建一个bean,然后将其注入camel,就像这样:
在路由生成器中:
现在,两个端点都有自己的队列支持,并且不会窃取彼此的消息。