我目前正在开发一个akka-stream/alpakka应用程序,它具有以下通用逻辑
1.给定一个Flow
,使用splitAfter
方法将其拆分为一个SubFlow
。
1.对于点1中的每个SubFlow
,使用prefixAndTail(1)
根据该Subflow
的第一个元素创建一个键
1.将每个Subflow
附加到一个Sink
,该Sink
已将该键 * 作为参数 * 提供。
假设我们有一个如下定义的函数,它给出了Sink
def mySink(key: String): Sink[ByteString, Future[Done]]
基本上我想做的是这样的
val source = ???
val subFlows = source.splitAfter(...)
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
val computedFlow: Flow[ByteString, ByteString] = ???
computedFlow
}
val finished = logic.prefixAndTail(1).to(
Sink.lazySink { (head) =>
val key = computeKey(head)
mySink(key)
}
)
这段代码显然不能编译,因为Sink.lazySink
的thunk没有参数。我尝试了其他的变体,但是没有任何结果。在Sink.lazySink
的文档中,它说您可以将它与prefixAndTail(1)
组合以获得流中的第一个元素,这样您就可以将第一个元素用作最终Sink
的输入,在我的示例中是mySink(key)
(类似于Flow
上的prefixAndTail(1).flatMapConcat
组合)。问题是.to
命令不接受参数(仅原始的Sink
),因此您将遇到以下问题
val finished = logic.prefixAndTail(1).to(
Sink.lazySink { () =>
// How do I get the first element out of the sink so I can compute
// the key?
val key = computeKey(???)
mySink(key)
}
)
我试着在logic
Flow
中计算密钥,但是你必须组成两个不同类型的Sink
(一个是密钥,从子流的头部计算,另一个是接受ByteStream
的原始Sink
),这实际上并不起作用
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
...
val computedFlow: Flow[ByteString, ByteString] = ???
val key = computeKey(head)
computedFlow.map(something => (something, key) )
}
val finished = logic.to(
Sink.lazySink { () =>
// Here I have to return a Sink[ByteString, String] but
// mySink only returns returns Sink[ByteString].
// I essentially need to do something along the lines of Sink[String].flatMap(key => mySink(key))
// but this doesn't work because Sink's don't have an output
}
)
由于我使用的是SubFlow
,所以我的方法有限,似乎找不到一种方法来传递密钥(通过每个SubFlow
中的第一个元素计算),并将其作为参数传递给mySink(key: String)
Sink.lazyInit
看起来很接近我想要的,我设法得到了下面的编译
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
...
val computedFlow(head, rest): Flow[ByteString, ByteString] = ???
val key = computeKey(head)
computedFlow.map(something => (something, key) )
}
val finished = logic.to(
Sink.lazyInit( { case (_, key} =>
Future.successful(
mySink(key).contramap[(ByteString, String)] { case (content, _) => content }
)
), ???)
)
虽然问题是Sink.lazyInit
已经过时了,所以我更喜欢使用prefixAndTail(1).to(Sink.lazySink)
组合(如折旧消息中所述)。而且我不确定需要Future
的意义是什么(以及Future.successful
是否合适,或者我应该使用需要ExecutionContext
的标准Future.apply
)。
1条答案
按热度按时间yqyhoc1h1#
我脑海中浮现出这样的画面
prefixAndTail
只发出一个元素(prefix
和tail
的元组),因此我们可以使用Sink.foreach
等待,直到发出mySink
到tail
并运行它。我们还有效地将Sink.foreach
的物化值替换为mySink
的物化值。Sink.fromMaterializer
似乎是建立一个每个具体化通道(promise/future组合)的唯一方法,用于将一个从stream元素构建的sink导入lazyFutureSink
。