Параллельный Хаскелл. Ограничивающий скорость производитель

В параллельном и параллельном программировании на Хаскелле Саймон Марлоу предлагает Stream a на основе следующих данных вместе с некоторыми производителями и потребителями:

data IList a
  = Nil
  | Cons a (IVar (IList a))

type Stream a = IVar (IList a)

streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
      var <- new
      fork $ loop xs var
      return var
    where
      loop [] var = put var Nil
      loop (x:xs) var = do
        tail <- new
        put var (Cons x tail)
        loop xs tail

Позже он упоминает о недостатках этого подхода и предлагает решение:

В нашем предыдущем примере потребитель был быстрее, чем производитель. Если бы вместо этого производитель был быстрее, чем потребитель, то ничто не помешало бы продюсеру продвинуться далеко впереди потребителя и создать длинную цепочку IList в памяти. Это нежелательно, потому что большие структуры данных кучи несут накладные расходы из-за сбора мусора, поэтому мы можем захотеть ограничить скорость производителя, чтобы избежать его слишком большого продвижения вперед. Есть хитрость, которая добавляет автоматическое ограничение скорости в потоковый API. Это влечет за собой добавление еще одного конструктора IList тип:

data IList a
    = Nil
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

Тем не менее, он не заканчивает этот подход:

Оставшуюся часть реализации этой идеи я оставлю как упражнение для вас, чтобы попробовать самостоятельно. Смотрите, если вы можете изменить streamFromList, streamFold, а также streamMap включить Fork конструктор. Размер куска и расстояние вилки должны быть параметрами для производителей (streamFromList а также streamMap).

Тот же вопрос был задан в списке рассылки, но никто не дал ответа.

Так как можно ограничить скорость производителя?

3 ответа

Решение

Важная часть заключается в loop функция:

  loop [] var = put var Nil
  loop (x:xs) var = do
    tail <- new
    put var (Cons x tail)
    loop xs tail

Нам нужно добавить расстояние вилки f и размер куска c в качестве параметров:

  loop _ _ [] var = put var Nil
  loop 0 c (x:xs) var = -- see below
  loop f c (x:xs) var = do
    tail <- new
    put var (Cons x tail)
    loop (f-1) c xs tail

Расстояние вилки уменьшается на каждой итерации. Что нам нужно сделать, когда расстояние от вилки равно нулю? Мы предоставляем Fork op t, где op продолжает производить список:

  loop 0 c (x:xs) var = do
    tail <- new
    let op = loop c xs tail
    put var (Fork op (Cons x tail))

Обратите внимание, что мы не используем Fork если список пуст. Это было бы возможно, но немного глупо, в конце концов, ничего не было произведено. изменения streamFromList теперь просто:

streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
  var <- new                            
  fork $ loop f c xs var                 
  return var 

Теперь, чтобы использовать его, нам нужно изменить case в streamFold:

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
  ilst <- get instrm
  case ilst of
    Cons h t          -> streamFold fn (fn acc h) t
    Fork p (Cons h t) -> -- see below
    _                 -> return acc

Помните, мы не разрешили пустой список в Fork в нашем streamFromList, но на всякий случай мы подходим к нему (и Nil) с помощью подстановочного знака.

Что нам нужно делать, если мы сталкиваемся с Fork с данными? Прежде всего, нам нужно использовать fork запустить Par () операция для распространения tи тогда мы сможем начать его использовать. Итак, наш последний случай

    Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t

streamMap аналогично. Только в этом случае вы снова используете дополнительные параметры в цикле, как в streamFromList,

Я думаю, что это правильная реализация.

{-# LANGUAGE BangPatterns #-}

import Control.Monad.Par (IVar, Par, fork, get, new, put, put_, runPar)
import Control.DeepSeq   (NFData, rnf)

data IList a
  = Nil
  | Cons a (IVar (IList a))
  | Fork (Par ()) (IVar (IList a))

instance NFData a => NFData (IList a) where
  rnf Nil = ()
  rnf (Cons a b) = rnf a `seq` rnf b
  rnf (Fork a b) = rnf (runPar a) `seq` rnf b

type Stream a = IVar (IList a)

main :: IO ()
main = print $ sum (pipeline [1 .. 10000])

pipeline :: [Int] -> [Int]
pipeline list = runPar $ do
  strm <- streamFromList list 100 200
  xs   <- streamFold (\x y -> (y : x)) [] strm
  return (reverse xs)

streamFromList :: NFData a => [a] -> Int -> Int -> Par (Stream a)
streamFromList xs k n = do
    var <- new
    fork $ loop xs var k
    return var
  where
    loop [] var _ = put var Nil
    loop xs var 0 = do
      var' <- new
      put_ var (Fork (loop xs var' n) var')
    loop (x:xs) var i = do
      tail <- new
      put var (Cons x tail)
      loop xs tail (i - 1)

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc strm = do
  ilst <- get strm
  case ilst of
    Nil      -> return acc
    Cons h t -> streamFold fn (fn acc h) t
    Fork p s -> fork p >> streamFold fn acc s

Вот, streamFromList (производитель) значения для потока, пока streamFoldпотребляет их параллельно. После первогоk ценности, streamFromList ставит Forkв потоке. ЭтаFork содержит вычисление для получения следующего n values ​​и поток, из которого эти значения могут быть получены.

На этом этапе у потребителя есть шанс наверстать упущенное, если он отстает от производителя. По достиженииFork, Это forks вложенный производитель. И снова производитель и потребитель могут действовать параллельно, пока производитель, за другимn значения, добавляет еще Fork к потоку, и цикл повторяется.

В этой реализации вилка помещается в середину созданного списка.

      import Control.DeepSeq
import Control.Monad.Par

data IList a
    = Nil -- need to be NFData
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

instance NFData a => NFData (IList a) where
    rnf Nil = ()
    rnf (Cons x xs) = rnf x `seq` rnf xs
    rnf (Fork c l) = rnf l

type Stream a = IVar (IList a)

-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamFold (+) 0)
-- 55

streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList chunkSize xs = do
    dt <- new
    dl <- new
    put dl xs
    fork $ next chunkSize dt dl
    return dt
  where
    next :: NFData a => Int -> Stream a -> IVar [a] -> Par ()
    next 1 dt dl = do 
        ilist <- get dl 
        case ilist of 
            [] -> put dt Nil 
            (x:xs) -> do 
                delaytail <- new 
                delaylist <- new 
                put delaylist xs
                put dt (Fork (next 1 delaytail delaylist) (Cons x delaytail))
    next chunkSize dt dl = do
        ilist <- get dl
        case ilist of
            [] -> put dt Nil
            (x : xs) -> do
                delaytail <- new
                delaylist <- new
                tail <- new
                put
                    dt
                    ( Fork
                        (next chunkSize delaytail delaylist)
                        (Cons x tail)
                    )
                loop xs tail delaytail delaylist (chunkSize - 2)
    loop :: NFData a => [a] -> Stream a -> Stream a -> IVar [a] -> Int -> Par ()
    loop [] var _ dl _ = do
        put var Nil
        put dl []
    loop (x : xs) var dt dl count =
        if count /= 0
            then do
                tail <- new
                put var (Cons x tail)
                loop xs tail dt dl (count - 1)
            else do
                put var (Cons x dt)
                put dl xs

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = do
    ilist <- get instrm
    case ilist of
        Nil -> return acc
        Cons h t -> streamFold fn (fn acc h) t
        Fork p Nil -> return acc
        Fork p (Cons h t) -> do
            fork p
            streamFold fn (fn acc h) t

-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamMap (*2)) >>= (streamFold (+) 0)
-- 110

streamMap :: (NFData a, NFData b) => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
    outstrm <- new
    fork $ init fn instrm outstrm
    return outstrm
  where
    init :: (NFData a, NFData b) => (a -> b) -> Stream a -> Stream b -> Par ()
    init fn instrm outstrm = do
        ilst <- get instrm
        case ilst of
            Nil -> put outstrm Nil
            Cons h t -> do
                newtl <- new
                put outstrm (Cons (fn h) newtl)
                init fn t newtl
            Fork p Nil -> put outstrm Nil
            Fork p (Cons h t) -> do
                fork p
                slist <- get t
                case slist of
                    Nil -> do
                        newtl <- new
                        put newtl Nil
                        put outstrm (Cons (fn h) newtl)
                    Cons h1 t1 -> do
                        newtl <- new
                        delaytail <- new
                        delaystrm <- new
                        put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) newtl))
                        loopCons fn h1 t1 newtl delaytail delaystrm
                    Fork p1 Nil -> do
                        delaytail <- new
                        put outstrm (Fork (put delaytail Nil) (Cons (fn h) delaytail))
                    Fork p1 (Cons h1 t1) -> do
                        delaytail <- new
                        delaystrm <- new
                        put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) delaytail))
    loopCons :: (NFData a, NFData b) => (a -> b) -> a -> Stream a -> Stream b -> Stream b -> Stream a -> Par ()
    loopCons fn h t var dl ds = do
        tlist <- get t
        case tlist of
            Nil -> do
                newtl <- new
                put newtl Nil
                put var (Cons (fn h) newtl)
                put ds Nil
            Cons h1 t1 -> do
                newtl <- new
                put var (Cons (fn h) newtl)
                loopCons fn h1 t1 newtl dl ds
            Fork p Nil -> do
                newtl <- new
                put newtl Nil
                put var (Cons (fn h) newtl)
            Fork p (Cons h1 t1) -> do
                put ds tlist
                put var (Cons (fn h) dl)
Другие вопросы по тегам