如何解决akka.stream.Graph sink错误akka流使用akka类型

2mbi3lxu  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(190)

我是新的在akka流和akka键入我的代码是工作良好的经典akka这里是我正在做的

class FlowActor extends Actor{
  val log = LoggerFactory.getLogger(this.getClass)

  def receive: Receive = {
    case word: String =>
      val replyTo = word.capitalize
      log.info("Actor {}, Capitalizing {}",context.self.path, replyTo)
      sender() ! replyTo
  }
}

val source = Source(List("hello","from","akka","streams!"))
val flowActor = system.actorOf(RoundRobinPool(10).props(Props[FlowActor]), "flowActor")
val flow = Flow[String].mapAsync(parallelism = 5)(elem => (flowActor ? elem).mapTo[String])
// sent from actor to stream to "ack" processing of given element
    val AckMessage = AckingReceiver.Ack

    // sent from stream to actor to indicate start, end or failure of stream:
    val InitMessage = AckingReceiver.StreamInitialized
    val OnCompleteMessage = AckingReceiver.StreamCompleted
    val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)

    val receiver = system.actorOf(Props(new SimpleAckingReceiver(AckMessage)), name = "recevier")

    val sink = Sink.actorRefWithBackpressure(
      receiver,
      onInitMessage = InitMessage,
      ackMessage = AckMessage,
      onCompleteMessage = OnCompleteMessage,
      onFailureMessage = onErrorMessage
    )

    val stream = source.via(flow).to(sink)
    stream.run()

在输出中,当我从akka经典转换到akka演员并做了这样的事情时,我现在得到了预期的结果

object FlowActor {
  val log = LoggerFactory.getLogger(this.getClass)
  val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")

  sealed trait Command
  final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
  final case class TextTransformed(text: String) extends CborSerializable

  def apply(): Behavior[Command] =
    Behaviors.setup { ctx =>
      // each worker registers themselves with the receptionist
      ctx.log.info("Registering myself with receptionist")
      ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)

      Behaviors.receiveMessage {
        case TransformText(text, replyTo) =>
          replyTo ! TextTransformed(text.capitalize)
          Behaviors.same
      }
    }
}

object SinkActor {

  sealed trait Event
  case object StartWork extends Event
  trait Ack
  object Ack extends Ack

  trait Protocol
  case class Init(ackTo: ActorRef[Ack]) extends Protocol
  case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
  case object Complete extends Protocol
  case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
    implicit val mat = actorMaterializer
    implicit val timeout=Timeout(5, SECONDS)
  Behaviors.receiveMessage {
    case STartWork=>
  val ref = ctx
        .spawn( (FlowActor()) ,"flowActor"
        )
        import FlowActor._
        val askFlow: Flow[String, TextTransformed, NotUsed] =
          ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???

        val actor: ActorRef[Protocol] = targetActor()

        val source = Source(List("hello","from","akka","streams!"))

        val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
          ref = actor,
          onCompleteMessage = Complete,
          onFailureMessage = Fail.apply,
          messageAdapter = Message.apply,
          onInitMessage = Init.apply,
          ackMessage = Ack)

        val stream = source.via(askFlow).runWith(sink)
Behaviors.same
    }
  }
  }

在最后一行我得到一个编译时错误

found   : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]  
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error]         val stream = source.via(askFlow).runWith(sink)


请引导我在这里错过了什么?

7cwmlq89

7cwmlq891#

发生这种情况是因为代码试图将发出FlowActor.TextTransformed元素的Flow连接到需要String元素的Sink。它需要将TextTransformed数据转换为String对象:

val stream = source.via(askFlow)
    .map(element => element.text) // or equivalently: .map(_.text)
    .runWith(sink)

至少还有另外两种可能的方法可以做到这一点,但我认为上面的例子是最好的方法,至少对于更现实和复杂的应用程序来说是这样。

  1. FlowActor可以通过将TransformText中的replyTo字段的类型更改为ActorRef[String],直接使用String而不是TextTransformed进行回复。因为它提高了类型安全性并使协议的发展更容易。
    1.接收器可以期望TextTransformed元素而不是String,方法是让messageAdaptor函数处理从TextTransformed对象展开String,或者通过使Message类在它的msg字段中使用TextTransformed而不是String,来实现。这种方法的缺点是它将接收器与流紧密耦合。

相关问题