haskell ST中可变向量的并行计算

bwleehnv  于 2023-01-31  发布在  其他
关注(0)|答案(3)|浏览(167)

如何使ST中完成的计算并行运行?
我有一个向量需要通过随机访问填充,因此使用了ST,计算可以在单线程下正确运行,但无法弄清楚如何使用多个内核。
由于向量中索引的意义,需要随机访问。有n个事物,在n个事物中进行选择的每种可能方法在向量中都有一个条目,就像选择函数中一样。这些选择中的每一个都对应于一个二进制数(概念上,一个压缩的[Bool]),这些Int值是索引。如果有n个事物,则向量的大小为2^n。算法运行的自然方式是填充对应于“n选择1”的每个条目,然后填充“n选择2”的每个条目,对应于“n选择k”的条目取决于对应于“n选择(k-1)”的条目。不同选择的整数不以数字顺序出现,这就是需要随机访问的原因。
下面是一个无意义的(但是很慢的)计算,它遵循相同的模式。example函数显示了我如何尝试将计算分解,以便在纯世界(没有ST monad)中完成大部分工作。在下面的代码中,bogus是完成大部分工作的地方,目的是并行调用它,但只使用了一个内核。

import qualified Data.Vector as Vb
import qualified Data.Vector.Mutable as Vm
import qualified Data.Vector.Generic.Mutable as Vg
import qualified Data.Vector.Generic as Gg
import Control.Monad.ST as ST ( ST, runST )
import Data.Foldable(forM_)
import Data.Char(digitToInt)

main :: IO ()
main = do
  putStrLn $ show (example 9)
  

example :: Int -> Vb.Vector Int
example n = runST $ do
  m <- Vg.new (2^n) :: ST s (Vm.STVector s Int)
  
  Vg.unsafeWrite m 0 (1)
  
  forM_ [1..n] $ \i -> do
    p <- prev m n (i-1)
    let newEntries = (choiceList n i) :: [Int]
    forM_ newEntries $ \e -> do
      let v = bogus p e
      Vg.unsafeWrite m e v
  
  Gg.unsafeFreeze m

choiceList :: Int -> Int -> [Int]
choiceList _ 0 = [0]
choiceList n 1 = [ 2^k | k <- [0..(n-1) ] ]
choiceList n k 
  | n == k = [2^n - 1]
  | otherwise = (choiceList (n-1) k) ++ (map ((2^(n-1)) +) $ choiceList (n-1) (k-1))

prev :: Vm.STVector s Int -> Int -> Int -> ST s Integer
prev m n 0 = return 1
prev m n i = do
  let chs = choiceList n i
  v <- mapM (\k -> Vg.unsafeRead m k ) chs
  let e = map (\k -> toInteger k ) v
  return (sum e)

bogus :: Integer -> Int -> Int
bogus prior index = do
  let f = fac prior
  let g = (f^index) :: Integer
  let d = (map digitToInt (show g)) :: [Int]
  let a = fromIntegral (head d)^2
  a

fac :: Integer -> Integer
fac 0 = 1
fac n = n * fac (n - 1)

如果有人对此进行测试,那么在show (example 9)中使用超过9或10将花费比您希望等待这样一个毫无意义的数字序列更长的时间。

6uxekuva

6uxekuva1#

只需要在IO中执行即可,如果您需要在纯代码中使用结果,那么unsafePerformIO是可用的。
下面的版本在+RTS -N16上运行的速度比+RTS -N1快3 - 4倍,我的修改包括将ST矢量转换为IO,将forM_转换为forConcurrently_,并在let !v = bogus ...上添加一个bang注解。
完整代码:

import qualified Data.Vector as Vb
import qualified Data.Vector.Mutable as Vm
import qualified Data.Vector.Generic.Mutable as Vg
import qualified Data.Vector.Generic as Gg
import Control.Monad.ST as ST ( ST, runST )
import Data.Foldable(forM_)
import Data.Char(digitToInt)
import Control.Concurrent.Async
import System.IO.Unsafe

main :: IO ()
main = do
  let m = unsafePerformIO (example 9)
  putStrLn $ show m

example :: Int -> IO (Vb.Vector Int)
example n = do
  m <- Vg.new (2^n)

  Vg.unsafeWrite m 0 (1)

  forM_ [1..n] $ \i -> do
    p <- prev m n (i-1)
    let newEntries = (choiceList n i) :: [Int]
    forConcurrently_ newEntries $ \e -> do
      let !v = bogus p e
      Vg.unsafeWrite m e v

  Gg.unsafeFreeze m

choiceList :: Int -> Int -> [Int]
choiceList _ 0 = [0]
choiceList n 1 = [ 2^k | k <- [0..(n-1) ] ]
choiceList n k
  | n == k = [2^n - 1]
  | otherwise = (choiceList (n-1) k) ++ (map ((2^(n-1)) +) $ choiceList (n-1) (k-1))

prev :: Vm.IOVector Int -> Int -> Int -> IO Integer
prev m n 0 = return 1
prev m n i = do
  let chs = choiceList n i
  v <- mapM (\k -> Vg.unsafeRead m k ) chs
  let e = map (\k -> toInteger k ) v
  return (sum e)

bogus :: Integer -> Int -> Int
bogus prior index = do
  let f = fac prior
  let g = (f^index) :: Integer
  let d = (map digitToInt (show g)) :: [Int]
  let a = fromIntegral (head d)^2
  a

fac :: Integer -> Integer
fac 0 = 1
fac n = n * fac (n - 1)
5ssjco0h

5ssjco0h2#

我认为这样做是不安全的,在一般情况下,这似乎会打破Haskell的参照透明性。
如果我们可以在ST s中执行多线程计算,那么我们就可以产生两个线程来竞争同一个STRef s Bool,假设一个线程正在写False,另一个线程正在写True
在我们使用runST进行计算之后,我们得到了一个Bool类型的表达式,有时是False,有时是True,这应该是不可能的。
如果你绝对确定你的并行化不会破坏引用透明性,你可以尝试使用unsafeIOToST这样的不安全原语来产生新线程,使用时要格外小心。
也许有更安全的方法来实现类似的事情。除了ST之外,我们确实在Control.Parallel.Strategies中提供了一些并行性。

pdsfdshx

pdsfdshx3#

在Haskell中有很多种并行化的方法。通常它们都能提供相当的性能提升,但是有些方法比其他方法更好,这主要取决于需要并行化的问题。这个特殊的用例对我来说非常有趣,所以我决定研究几种方法。

方法

∮ ∮ ∮ ∮ ∮一个月一个月
我们使用的是一个装箱向量,因此我们可以利用惰性和内置的Spark池来实现并行化。vector-strategies包提供了一个非常简单的方法,它可以迭代任何不可变的装箱向量,并并行计算所有thunk。也可以将向量分成块,但事实证明1的块大小是最佳的:

exampleParVector :: Int -> Vb.Vector Int
exampleParVector n = example n `using` parVector 1

parallel

parVector在下面使用par,需要对向量进行一次额外的迭代。在这种情况下,我们已经在对向量进行迭代,因此直接使用parallel中的par实际上更有意义。这将允许我们在继续使用ST单子的同时并行执行计算:

import Control.Parallel (par)
...

  forM_ [1..n] $ \i -> do
    p <- prev m n (i-1)
    let newEntries = choiceList n i :: [Int]
    forM_ newEntries $ \e -> do
      let v = bogus p e
      v `par` Vg.unsafeWrite m e v

值得注意的是,与向量中的元素总数相比,向量中每个元素的计算成本都很高。这就是为什么使用par是一个非常好的解决方案。如果情况正好相反,即向量非常大,但元素的计算成本并不太高,则最好使用未装箱的向量,并将其切换到另一种并行化方法。

一米十一秒

@K.A.Buhr.描述了另一种方法,从ST切换到IO并使用async

import Control.Concurrent.Async (forConcurrently_)
...

  forM_ [1..n] $ \i -> do
    p <- prev m n (i-1)
    let newEntries = choiceList n i :: [Int]
    forConcurrently_ newEntries $ \e -> do
      let !v = bogus p e
      Vg.unsafeWrite m e v

@chi提出的问题是有道理的,但是在这个特定的实现中,使用unsafePerformIO而不是runST是安全的,因为并行化不会违反确定性计算的不变量,也就是说,我们可以保证,不管提供给example函数的输入是什么,输出总是完全相同的。

一米十七

绿色线程在Haskell是相当便宜的,但他们不是免费的。上面的async包的解决方案有一个小缺点:每次调用forConcurrently_时,它将启动至少与newEntries列表中的元素一样多的线程。最好启动与功能(-N RTS选项)一样多的线程,并让它们完成所有工作。为此,我们可以使用scheduler包,这是一个工作窃取调度程序:

import Control.Scheduler (Comp(Par), runBatch_, withScheduler_)
...

  withScheduler_ Par $ \scheduler ->
    forM_ [1..n] $ \i -> runBatch_ scheduler $ \_ -> do
      p <- prev m n (i-1)
      let newEntries = choiceList n i :: [Int]
      forM_ newEntries $ \e -> scheduleWork_ scheduler $ do
        let !v = bogus p e
        Vg.unsafeWrite m e v

GHC中的Spark pool也使用了一个工作窃取调度器,它内置在RTS中,与上面的包在任何形状或形式上都无关,但思想非常相似:具有许多计算单元的少数线程。

基准

以下是在16核计算机上针对example 79的值取秒级,这会给criterion带来太多噪声)的所有方法进行的一些基准测试。我们只获得了大约5倍的加速比,因为算法的很大一部分本质上是顺序的,无法并行化。

相关问题