Параллельный Хаскелл. Ограничивающий скорость производитель
В параллельном и параллельном программировании на Хаскелле Саймон Марлоу предлагает Stream a
на основе следующих данных вместе с некоторыми производителями и потребителями:
data IList a
= Nil
| Cons a (IVar (IList a))
type Stream a = IVar (IList a)
streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
var <- new
fork $ loop xs var
return var
where
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
Позже он упоминает о недостатках этого подхода и предлагает решение:
В нашем предыдущем примере потребитель был быстрее, чем производитель. Если бы вместо этого производитель был быстрее, чем потребитель, то ничто не помешало бы продюсеру продвинуться далеко впереди потребителя и создать длинную цепочку IList в памяти. Это нежелательно, потому что большие структуры данных кучи несут накладные расходы из-за сбора мусора, поэтому мы можем захотеть ограничить скорость производителя, чтобы избежать его слишком большого продвижения вперед. Есть хитрость, которая добавляет автоматическое ограничение скорости в потоковый API. Это влечет за собой добавление еще одного конструктора
IList
тип:data IList a = Nil | Cons a (IVar (IList a)) | Fork (Par ()) (IList a)
Тем не менее, он не заканчивает этот подход:
Оставшуюся часть реализации этой идеи я оставлю как упражнение для вас, чтобы попробовать самостоятельно. Смотрите, если вы можете изменить
streamFromList
,streamFold
, а такжеstreamMap
включитьFork
конструктор. Размер куска и расстояние вилки должны быть параметрами для производителей (streamFromList
а такжеstreamMap
).
Тот же вопрос был задан в списке рассылки, но никто не дал ответа.
Так как можно ограничить скорость производителя?
3 ответа
Важная часть заключается в loop
функция:
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
Нам нужно добавить расстояние вилки f
и размер куска c
в качестве параметров:
loop _ _ [] var = put var Nil
loop 0 c (x:xs) var = -- see below
loop f c (x:xs) var = do
tail <- new
put var (Cons x tail)
loop (f-1) c xs tail
Расстояние вилки уменьшается на каждой итерации. Что нам нужно сделать, когда расстояние от вилки равно нулю? Мы предоставляем Fork op t
, где op
продолжает производить список:
loop 0 c (x:xs) var = do
tail <- new
let op = loop c xs tail
put var (Fork op (Cons x tail))
Обратите внимание, что мы не используем Fork
если список пуст. Это было бы возможно, но немного глупо, в конце концов, ничего не было произведено. изменения streamFromList
теперь просто:
streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
var <- new
fork $ loop f c xs var
return var
Теперь, чтобы использовать его, нам нужно изменить case
в streamFold
:
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
ilst <- get instrm
case ilst of
Cons h t -> streamFold fn (fn acc h) t
Fork p (Cons h t) -> -- see below
_ -> return acc
Помните, мы не разрешили пустой список в Fork
в нашем streamFromList
, но на всякий случай мы подходим к нему (и Nil
) с помощью подстановочного знака.
Что нам нужно делать, если мы сталкиваемся с Fork
с данными? Прежде всего, нам нужно использовать fork
запустить Par ()
операция для распространения t
и тогда мы сможем начать его использовать. Итак, наш последний случай
Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t
streamMap
аналогично. Только в этом случае вы снова используете дополнительные параметры в цикле, как в streamFromList
,
Я думаю, что это правильная реализация.
{-# LANGUAGE BangPatterns #-}
import Control.Monad.Par (IVar, Par, fork, get, new, put, put_, runPar)
import Control.DeepSeq (NFData, rnf)
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IVar (IList a))
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons a b) = rnf a `seq` rnf b
rnf (Fork a b) = rnf (runPar a) `seq` rnf b
type Stream a = IVar (IList a)
main :: IO ()
main = print $ sum (pipeline [1 .. 10000])
pipeline :: [Int] -> [Int]
pipeline list = runPar $ do
strm <- streamFromList list 100 200
xs <- streamFold (\x y -> (y : x)) [] strm
return (reverse xs)
streamFromList :: NFData a => [a] -> Int -> Int -> Par (Stream a)
streamFromList xs k n = do
var <- new
fork $ loop xs var k
return var
where
loop [] var _ = put var Nil
loop xs var 0 = do
var' <- new
put_ var (Fork (loop xs var' n) var')
loop (x:xs) var i = do
tail <- new
put var (Cons x tail)
loop xs tail (i - 1)
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc strm = do
ilst <- get strm
case ilst of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p s -> fork p >> streamFold fn acc s
Вот, streamFromList
(производитель) значения для потока, пока streamFold
потребляет их параллельно. После первогоk
ценности, streamFromList
ставит Fork
в потоке. ЭтаFork
содержит вычисление для получения следующего n
values и поток, из которого эти значения могут быть получены.
На этом этапе у потребителя есть шанс наверстать упущенное, если он отстает от производителя. По достиженииFork
, Это fork
s вложенный производитель. И снова производитель и потребитель могут действовать параллельно, пока производитель, за другимn
значения, добавляет еще Fork
к потоку, и цикл повторяется.
В этой реализации вилка помещается в середину созданного списка.
import Control.DeepSeq
import Control.Monad.Par
data IList a
= Nil -- need to be NFData
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons x xs) = rnf x `seq` rnf xs
rnf (Fork c l) = rnf l
type Stream a = IVar (IList a)
-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamFold (+) 0)
-- 55
streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList chunkSize xs = do
dt <- new
dl <- new
put dl xs
fork $ next chunkSize dt dl
return dt
where
next :: NFData a => Int -> Stream a -> IVar [a] -> Par ()
next 1 dt dl = do
ilist <- get dl
case ilist of
[] -> put dt Nil
(x:xs) -> do
delaytail <- new
delaylist <- new
put delaylist xs
put dt (Fork (next 1 delaytail delaylist) (Cons x delaytail))
next chunkSize dt dl = do
ilist <- get dl
case ilist of
[] -> put dt Nil
(x : xs) -> do
delaytail <- new
delaylist <- new
tail <- new
put
dt
( Fork
(next chunkSize delaytail delaylist)
(Cons x tail)
)
loop xs tail delaytail delaylist (chunkSize - 2)
loop :: NFData a => [a] -> Stream a -> Stream a -> IVar [a] -> Int -> Par ()
loop [] var _ dl _ = do
put var Nil
put dl []
loop (x : xs) var dt dl count =
if count /= 0
then do
tail <- new
put var (Cons x tail)
loop xs tail dt dl (count - 1)
else do
put var (Cons x dt)
put dl xs
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = do
ilist <- get instrm
case ilist of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p Nil -> return acc
Fork p (Cons h t) -> do
fork p
streamFold fn (fn acc h) t
-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamMap (*2)) >>= (streamFold (+) 0)
-- 110
streamMap :: (NFData a, NFData b) => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
outstrm <- new
fork $ init fn instrm outstrm
return outstrm
where
init :: (NFData a, NFData b) => (a -> b) -> Stream a -> Stream b -> Par ()
init fn instrm outstrm = do
ilst <- get instrm
case ilst of
Nil -> put outstrm Nil
Cons h t -> do
newtl <- new
put outstrm (Cons (fn h) newtl)
init fn t newtl
Fork p Nil -> put outstrm Nil
Fork p (Cons h t) -> do
fork p
slist <- get t
case slist of
Nil -> do
newtl <- new
put newtl Nil
put outstrm (Cons (fn h) newtl)
Cons h1 t1 -> do
newtl <- new
delaytail <- new
delaystrm <- new
put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) newtl))
loopCons fn h1 t1 newtl delaytail delaystrm
Fork p1 Nil -> do
delaytail <- new
put outstrm (Fork (put delaytail Nil) (Cons (fn h) delaytail))
Fork p1 (Cons h1 t1) -> do
delaytail <- new
delaystrm <- new
put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) delaytail))
loopCons :: (NFData a, NFData b) => (a -> b) -> a -> Stream a -> Stream b -> Stream b -> Stream a -> Par ()
loopCons fn h t var dl ds = do
tlist <- get t
case tlist of
Nil -> do
newtl <- new
put newtl Nil
put var (Cons (fn h) newtl)
put ds Nil
Cons h1 t1 -> do
newtl <- new
put var (Cons (fn h) newtl)
loopCons fn h1 t1 newtl dl ds
Fork p Nil -> do
newtl <- new
put newtl Nil
put var (Cons (fn h) newtl)
Fork p (Cons h1 t1) -> do
put ds tlist
put var (Cons (fn h) dl)