根据发送的消息在Akka上请求恢复

fruv7luv  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(169)

我正在通过ask向参与者发送不同的消息。在超时时,我想提供一个默认值,该值与向参与者请求的消息不同。
由于超时异常总是相同的,我不能在recover中使用它来返回不同的默认值,我需要发送的原始消息。
如何才能做到这一点。
程式码范例:

val storageActorProxy = Flow[ByteString]
        .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
        .map(TCPMessage.decode)
        .ask[OperationResponse](storageActor)
        //TODO: looking for this recover; non-existent AFAIK
        .customRecover { 
            case Op1 => DefaultResponseA()
            case Op2 => DefaultResponseB()
        }
        .map(TCPMessage.encode(_).toByteString)
1mrurvl1

1mrurvl11#

Akka的ask方法实际上很容易重新创建--它只是一个mapAsync,加上一些额外的逻辑,以便在演员死亡时产生更好的错误(参见代码)。因此,只需手动使用mapAsync,就可以恢复ask错误。

val storageActorProxy = Flow[ByteString]
  .via(Framing.lengthField(TCPMessage.sizeFieldLength, TCPMessage.sizeFieldIndex, Int.MaxValue))
  .map(TCPMessage.decode)
  .mapAsync(parallelism = 2) { decodedMessage =>
     (storageActor ? decodedMessage).recover {
       case Op1 => DefaultResponseA()
       case Op2 => DefaultResponseB()
     }
  }
  .map(TCPMessage.encode(_).toByteString)

相关问题