在Haskell的并行和并发编程中,Simon马洛提供了一个基于以下数据的Stream a
,以及一些生产者和消费者:
data IList a
= Nil
| Cons a (IVar (IList a))
type Stream a = IVar (IList a)
streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
var <- new
fork $ loop xs var
return var
where
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
后来,他提到了这种方法的缺点,并提出了一个解决方案:
在我们前面的例子中,消费者比生产者快。相反,如果生产者比消费者更快,那么就没有什么可以阻止生产者走在消费者前面很长一段路,并在内存中建立一个很长的IList链。这是不可取的,因为大型堆数据结构会因垃圾收集而产生开销,因此我们可能希望对生产者进行速率限制,以避免它超前太多。有一个技巧可以为流API添加一些自动速率限制。它需要向IList
类型添加另一个构造函数:
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)
然而,他没有完成这种方法:
我将把这个想法的其余实现作为练习,让您自己尝试。看看是否可以修改streamFromList
、streamFold
和streamMap
以包含Fork
构造函数。区块大小和分叉距离应该是生产者(streamFromList
和streamMap
)的参数。
同样的问题has been asked on the mailing list,但没有人回答。
那么,如何限制生产者的生产率呢?
3条答案
按热度按时间zf9nrax11#
重要的部分在于
loop
函数:我们需要添加分叉距离
f
和区块大小c
作为参数:分叉距离在每次迭代中减小。当分叉距离为零时,我们需要做什么?我们提供了一个
Fork op t
,其中op
继续生成列表:请注意,如果列表为空,则不使用
Fork
。这是可能的,但有点傻,毕竟,没有什么可生产的了。现在更改streamFromList
很简单:现在,为了使用它,我们需要在
streamFold
中更改case
:请记住,我们不允许
streamFromList
中的Fork
中有空列表,但以防万一我们通过Nil
匹配它(和Nil
)。如果我们遇到一个
Fork
数据,我们需要做什么?首先,我们需要使用fork
来运行Par ()
操作,以便传播t
,然后我们可以开始使用它。我们最后一个案子是streamMap
类似。只有在这种情况下,你才能像在streamFromList
中一样在循环中使用额外的参数。dy2hfwbg2#
我认为以下是一个有效的实现。
在这里,
streamFromList
(生产者)将值分配给流,而streamFold
并行使用它们。在第一个k
值之后,streamFromList
将Fork
放入流中。这个Fork
包含产生下一个n
值的计算,以及可以从中消费这些值的流。在这一点上,消费者有机会赶上,如果它落后于生产者。在到达
Fork
时,它是fork
所包含的生产者。同样,生产者和消费者可以并行进行,直到生产者在另一个n
值之后将另一个Fork
添加到流中,并且循环重复。py49o6xq3#
在这个实现中,fork被放置在生成列表的中间。