我们已经定义了一个基本订阅者,它通过抛出异常并依赖akka streams的流监视来恢复失败消息(即出于某些业务逻辑原因,我们不打算处理这些消息) Flow
:
someLagomService
.someTopic()
.subscribe
.withGroupId("lagom-service")
.atLeastOnce(
Flow[Int]
.mapAsync(1)(el => {
// Exception may occur here or can map to Done
})
.withAttributes(ActorAttributes.supervisionStrategy({
case t =>
Supervision.Resume
})
)
这对于负载很小的基本用例来说似乎工作得很好,但是对于大量的消息,我们注意到了非常奇怪的事情(例如:非常频繁地重新处理消息等等)。
深入研究代码,我们看到拉格姆的 broker.Subscriber.atLeastOnce
文件说明:
这个 flow
可能会从上游拉动更多元素,但它必须正好发射一个元素 Done
它接收到的每条消息的消息。它还必须以接收消息的相同顺序发出它们。这意味着 flow
不能过滤或收集消息的子集,相反,它必须将消息拆分为单独的流,并Map那些将被丢弃到的流 Done
.
另外,在lagom的 KafkaSubscriberActor
,我们看到 atLeastOnce
从本质上说,解压消息负载和偏移量,然后重新解压,然后在我们的用户流将消息Map到 Done
.
上面的这两个小道消息似乎意味着,通过使用流管理器和跳过元素,我们最终会遇到可提交的偏移量不再与 Done
要根据Kafka的信息生成的。
示例:如果我们流化1、2、3、4并将1、2和4Map到 Done
但是在3上抛出一个异常,我们有3 Done
s和4个可承诺的补偿?
这是正确的/预期的吗?这是否意味着我们应该避免在这里使用流管理器?
不均匀的拉链会导致什么样的行为?
当涉及到通过lagom message broker api使用kafka的消息时,建议采用什么样的错误处理方法?将故障Map/恢复到 Done
?
使用lagom 1.4.10
1条答案
按热度按时间ddhy6vgd1#
这是正确的/预期的吗?这是否意味着我们应该避免在这里使用流管理器?
api官方文件上说
如果正在使用kafka-lagom消息代理模块,则默认情况下,当发生故障时,流将自动重新启动。
所以,没有必要加上你自己的
supervisionStrategy
管理错误处理。默认情况下,流将重新启动,您不应该考虑“跳过”完成的消息。不均匀的拉链会导致什么样的行为?
正因为如此,文件上说:
这意味着流不能过滤或收集消息的子集
它可能会提交错误的偏移量。在重新启动时,您可能会以重播的形式从提交的较低偏移量获得已处理的消息。
当涉及到通过lagom message broker api使用kafka的消息时,建议采用什么样的错误处理方法?Map/恢复故障的正确方法是否正确?
lagom通过删除导致错误的消息并重新启动流来处理异常。Map/恢复失败到完成不会对此有任何更改。
您可以考虑,如果以后需要访问这些消息,也可以使用
Try {}
例如,ie不抛出异常,并通过将错误消息发送到不同的主题来收集错误消息,这将使您有机会监视错误的数量,并在条件正确时重播导致错误的消息,即错误已修复。