akka 将prefixAndTail(1)与由.splitAfter创建的子流的接收器.lazySink组合

xvw2m8pv  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(120)

我目前正在开发一个akka-stream/alpakka应用程序,它具有以下通用逻辑
1.给定一个Flow,使用splitAfter方法将其拆分为一个SubFlow
1.对于点1中的每个SubFlow,使用prefixAndTail(1)根据该Subflow的第一个元素创建一个键
1.将每个Subflow附加到一个Sink,该Sink已将该键 * 作为参数 * 提供。
假设我们有一个如下定义的函数,它给出了Sink

def mySink(key: String): Sink[ByteString, Future[Done]]

基本上我想做的是这样的

val source = ???
val subFlows = source.splitAfter(...)
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
  val computedFlow: Flow[ByteString, ByteString]  = ???
  computedFlow
}
val finished = logic.prefixAndTail(1).to(
  Sink.lazySink { (head) => 
    val key = computeKey(head)
    mySink(key)
  }
)

这段代码显然不能编译,因为Sink.lazySink的thunk没有参数。我尝试了其他的变体,但是没有任何结果。在Sink.lazySink的文档中,它说您可以将它与prefixAndTail(1)组合以获得流中的第一个元素,这样您就可以将第一个元素用作最终Sink的输入,在我的示例中是mySink(key)(类似于Flow上的prefixAndTail(1).flatMapConcat组合)。问题是.to命令不接受参数(仅原始的Sink),因此您将遇到以下问题

val finished = logic.prefixAndTail(1).to(
  Sink.lazySink { () => 
    // How do I get the first element out of the sink so I can compute
    // the key?
    val key = computeKey(???)
    mySink(key)
  }
)

我试着在logicFlow中计算密钥,但是你必须组成两个不同类型的Sink(一个是密钥,从子流的头部计算,另一个是接受ByteStream的原始Sink),这实际上并不起作用

val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
  ...
  val computedFlow: Flow[ByteString, ByteString] = ???
  val key = computeKey(head)
  computedFlow.map(something => (something, key) )
}
val finished = logic.to(
  Sink.lazySink { () => 
    // Here I have to return a Sink[ByteString, String] but 
    // mySink only returns returns Sink[ByteString].
    // I essentially need to do something along the lines of Sink[String].flatMap(key => mySink(key))
    // but this doesn't work because Sink's don't have an output
  }
)

由于我使用的是SubFlow,所以我的方法有限,似乎找不到一种方法来传递密钥(通过每个SubFlow中的第一个元素计算),并将其作为参数传递给mySink(key: String)
Sink.lazyInit看起来很接近我想要的,我设法得到了下面的编译

val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
  ...
  val computedFlow(head, rest): Flow[ByteString, ByteString] = ???
  val key = computeKey(head)
  computedFlow.map(something => (something, key) )
}
val finished = logic.to(
  Sink.lazyInit( { case (_, key} =>
    Future.successful(
      mySink(key).contramap[(ByteString, String)] { case (content, _) => content }
    )
  ), ???)
)

虽然问题是Sink.lazyInit已经过时了,所以我更喜欢使用prefixAndTail(1).to(Sink.lazySink)组合(如折旧消息中所述)。而且我不确定需要Future的意义是什么(以及Future.successful是否合适,或者我应该使用需要ExecutionContext的标准Future.apply)。

yqyhoc1h

yqyhoc1h1#

我脑海中浮现出这样的画面

def logic: Source[ByteString, NotUsed] = ???

val finished = logic.prefixAndTail(1).to {
  val donePromise = Promise[Done]()
  Sink.foreach {
    case (prefix, tail) =>
      prefix.headOption match {
        case None => donePromise.success(Done)
        case Some(head) =>
          donePromise.completeWith(tail.runWith(mySink(head)))
      }
  }.mapMaterializedValue(_ => donePromise.future)
}

prefixAndTail只发出一个元素(prefixtail的元组),因此我们可以使用Sink.foreach等待,直到发出mySinktail并运行它。我们还有效地将Sink.foreach的物化值替换为mySink的物化值。

val finished = logic.to {
  Sink.fromMaterializer { (mat, _) =>
    // TODO: plumb stream attributes through
    val sinkPromise = Promise[Sink[ByteString, Future[Done]]]()
    Flow[ByteString].prefixAndTail(1)
      .wireTap {
        case (prefix, tail) =>
          prefix.headOption match {
            case None => sinkPromise.success(Sink.ignore) // or fail with an exception signalling nothing materialized
            case Some(head) => sinkPromise.success(mySink(head))
          }
      }
      .flatMapConcat(_._2)
      .toMat(Sink.lazyFutureSink(() => sinkPromise.future))(Keep.right)
  }
}

Sink.fromMaterializer似乎是建立一个每个具体化通道(promise/future组合)的唯一方法,用于将一个从stream元素构建的sink导入lazyFutureSink

相关问题