Почему мой параллельный обход программы на Haskell приводит к утечке памяти?

Рассмотрим следующую программу на Haskell (я делаю это в основном для целей обучения):

import qualified Control.Concurrent.MSem as Sem
import System.Environment (getArgs)
import Control.Concurrent (forkIO)
import Control.Monad

-- Traverse with maximum n threads
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> Sem.with sem (forkIO $ action value)

main :: IO ()
main = do
  args <- getArgs
  let nThreads = read . head $ args :: Int
  parallelTraverse nThreads print [(1::Int)..]

когда я его запускаю, память быстро поднимается до нескольких ГБ. Я пробовал различные комбинации, чтобы убедиться, что я отбрасываю результаты промежуточных вычислений (действия печати). Почему это все еще протекает пространство?

1 ответ

Решение

Прежде всего, у вас есть очевидная ошибка в следующем фрагменте:

Sem.with sem (forkIO $ action value)

Вы обращаетесь к семафору из основного потока вокруг операции "fork" вместо действия там. Ниже приведен правильный способ его реализации:

forkIO (Sem.with sem (action value))

Т.е. обращаться к семафору из контекста разветвленной нити.

Во-вторых, в следующем коде вы вызываете parallelTraverse операция над бесконечным списком:

parallelTraverse nThreads print [(1::Int)..]

Что приводит к бесконечному разветвлению потоков. И так как forkIO операция для вызывающего потока выполняется практически мгновенно, поэтому неудивительно, что у вас заканчиваются ресурсы довольно скоро.


Чтобы использовать семафор для ограничения количества рабочих потоков, with шаблон просто не подойдет в вашем случае. Вместо этого вы должны использовать явную комбинацию wait а также signal и не забудьте правильно обработать исключения (если вы ожидаете их). Например,:

parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> do
    Sem.wait sem
    forkIO $ finally (action value) (Sem.signal sem)
Другие вопросы по тегам