我是新的在akka流和akka键入我的代码是工作良好的经典akka这里是我正在做的
class FlowActor extends Actor{
val log = LoggerFactory.getLogger(this.getClass)
def receive: Receive = {
case word: String =>
val replyTo = word.capitalize
log.info("Actor {}, Capitalizing {}",context.self.path, replyTo)
sender() ! replyTo
}
}
val source = Source(List("hello","from","akka","streams!"))
val flowActor = system.actorOf(RoundRobinPool(10).props(Props[FlowActor]), "flowActor")
val flow = Flow[String].mapAsync(parallelism = 5)(elem => (flowActor ? elem).mapTo[String])
// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)
val receiver = system.actorOf(Props(new SimpleAckingReceiver(AckMessage)), name = "recevier")
val sink = Sink.actorRefWithBackpressure(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val stream = source.via(flow).to(sink)
stream.run()
在输出中,当我从akka经典转换到akka演员并做了这样的事情时,我现在得到了预期的结果
object FlowActor {
val log = LoggerFactory.getLogger(this.getClass)
val FlowServiceKey = ServiceKey[FlowActor.TransformText]("FlowActor")
sealed trait Command
final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
final case class TextTransformed(text: String) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.setup { ctx =>
// each worker registers themselves with the receptionist
ctx.log.info("Registering myself with receptionist")
ctx.system.receptionist ! Receptionist.Register(FlowServiceKey, ctx.self)
Behaviors.receiveMessage {
case TransformText(text, replyTo) =>
replyTo ! TextTransformed(text.capitalize)
Behaviors.same
}
}
}
object SinkActor {
sealed trait Event
case object StartWork extends Event
trait Ack
object Ack extends Ack
trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
def apply(actorMaterializer: ActorMaterializer): Behavior[Event] = Behaviors.setup { ctx =>
implicit val mat = actorMaterializer
implicit val timeout=Timeout(5, SECONDS)
Behaviors.receiveMessage {
case STartWork=>
val ref = ctx
.spawn( (FlowActor()) ,"flowActor"
)
import FlowActor._
val askFlow: Flow[String, TextTransformed, NotUsed] =
ActorFlow.ask(ref)(TransformText.apply)
def targetActor(): ActorRef[Protocol] = ???
val actor: ActorRef[Protocol] = targetActor()
val source = Source(List("hello","from","akka","streams!"))
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
ref = actor,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val stream = source.via(askFlow).runWith(sink)
Behaviors.same
}
}
}
在最后一行我得到一个编译时错误
found : akka.stream.scaladsl.Sink[String,akka.NotUsed]
[error]
required: akka.stream.Graph[akka.stream.SinkShape[sample.cluster.akkastreams.FlowActor.TextTransformed],?]
[error] val stream = source.via(askFlow).runWith(sink)
我
请引导我在这里错过了什么?
1条答案
按热度按时间7cwmlq891#
发生这种情况是因为代码试图将发出
FlowActor.TextTransformed
元素的Flow
连接到需要String
元素的Sink
。它需要将TextTransformed
数据转换为String
对象:至少还有另外两种可能的方法可以做到这一点,但我认为上面的例子是最好的方法,至少对于更现实和复杂的应用程序来说是这样。
FlowActor
可以通过将TransformText
中的replyTo
字段的类型更改为ActorRef[String]
,直接使用String
而不是TextTransformed
进行回复。因为它提高了类型安全性并使协议的发展更容易。1.接收器可以期望
TextTransformed
元素而不是String
,方法是让messageAdaptor
函数处理从TextTransformed
对象展开String
,或者通过使Message
类在它的msg
字段中使用TextTransformed
而不是String
,来实现。这种方法的缺点是它将接收器与流紧密耦合。