QSem не блокирует потоки

Я пишу простой скрипт для параллельного запуска множества задач с использованием библиотеки Shelly, но я хочу ограничить максимальное количество задач, выполняемых одновременно. Сценарий берет файл с входными данными в каждой строке и запускает задачу для этого ввода. В файле содержится несколько сотен входных данных, и я хочу ограничить до 16 процессов одновременно.

Текущий скрипт на самом деле ограничивается 1 (хорошо пытается), используя QSem с начальным счетом 1. Я, кажется, что-то упускаю, хотя, потому что когда я запускаю тестовый файл с 4 входами, я вижу это:

начало
начало
начало
начало
Готово
Готово
Готово
Готово

Таким образом, потоки не блокируют QSem, как я ожидал, они все работают одновременно. Я даже зашел так далеко, что реализовал свои собственные семафоры как на MVar а также TVar и ни один из них не работал так, как я ожидал. Я явно упускаю что-то фундаментальное, но что? Я также попытался скомпилировать код и запустить его как двоичный файл.

#! / usr / bin / env runhaskell
{- # ЯЗЫК TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-}

импорт Шелли
импорт прелюдии скрывается (FilePath)
импортировать Text.Shakespeare.Text (lt)
импортировать квалифицированные Data.Text.Lazy as LT
импортировать Control.Monad (forM)
импорт System.Environment (getArgs)

импортировать квалифицированный Control.Concurrent.QSem как QSem
импортировать Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar)

- Определить максимальное количество одновременных процессов
maxProcesses:: IO QSem.QSem
maxProcesses = QSem.newQSem 1

bkGrnd:: ШИО а -> ШИО (МВАР а)
bkGrnd proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    - Блокировать, пока не появятся свободные процессы
    sem <- maxProcesses
    QSem.waitQSem sem
    putStrLn "Старт"
    - Запустите команду оболочки
    результат <- шелли $ молча
    результат LiftIO $ putMVar mvar
    putStrLn "Готово"
    - Сигнал о том, что этот процесс завершен и может выполняться другой.
    QSem.signalQSem sem
  возврат мвар

главная:: IO ()
главная = шелли $ молча $ до
    [img, файл] <- liftIO $ getArgs
    содержимое <- файл чтения $ из текста $ файл LT.pack
    - Запустите фоновый процесс для каждой строки ввода.
    результаты <- forM (содержимое LT.lines) $ \ line -> bkGrnd $ do
      runStdin <команда> <аргументы>
    результаты liftIO $ mapM_ takeMVar

3 ответа

Решение

Как я сказал в своем комментарии, каждый звонок bkGrnd создает свой собственный семафон, позволяя каждому потоку продолжать работу без ожидания. Я бы попробовал что-то вроде этого, где семафор создается в main и проходил каждый раз bkGrnd,

bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    -- Block until there are free processes
    QSem.waitQSem sem
    --
    -- code continues as before
    --

main :: IO ()
main = shelly $ silently $ do
    [img, file] <- liftIO $ getArgs
    contents <- readfile $ fromText $ LT.pack file
    sem <- maxProcesses
    -- Run a backgrounded process for each line of input.
    results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
      runStdin <command> <arguments>
    liftIO $ mapM_ takeMVar results

У вас есть ответ, но мне нужно добавить: QSem и QSemN не являются потокобезопасными, если возможна killThread или асинхронная смерть потока.

Мой отчет об ошибке и патч - это GHC trac ticket # 3160. Фиксированный код доступен в виде новой библиотеки под названием SafeSemaphore с модулем Control.Concurrent.MSem, MSemN, MSampleVar и бонусом FairRWLock.

Не лучше ли

bkGrnd sem proc = do
  QSem.waitQSem sem
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
  ...

так даже не forkIO пока вы не получите семафор?

Другие вопросы по тегам