QSem не блокирует потоки
Я пишу простой скрипт для параллельного запуска множества задач с использованием библиотеки Shelly, но я хочу ограничить максимальное количество задач, выполняемых одновременно. Сценарий берет файл с входными данными в каждой строке и запускает задачу для этого ввода. В файле содержится несколько сотен входных данных, и я хочу ограничить до 16 процессов одновременно.
Текущий скрипт на самом деле ограничивается 1 (хорошо пытается), используя QSem с начальным счетом 1. Я, кажется, что-то упускаю, хотя, потому что когда я запускаю тестовый файл с 4 входами, я вижу это:
начало начало начало начало Готово Готово Готово Готово
Таким образом, потоки не блокируют QSem, как я ожидал, они все работают одновременно. Я даже зашел так далеко, что реализовал свои собственные семафоры как на MVar
а также TVar
и ни один из них не работал так, как я ожидал. Я явно упускаю что-то фундаментальное, но что? Я также попытался скомпилировать код и запустить его как двоичный файл.
#! / usr / bin / env runhaskell {- # ЯЗЫК TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-} импорт Шелли импорт прелюдии скрывается (FilePath) импортировать Text.Shakespeare.Text (lt) импортировать квалифицированные Data.Text.Lazy as LT импортировать Control.Monad (forM) импорт System.Environment (getArgs) импортировать квалифицированный Control.Concurrent.QSem как QSem импортировать Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar) - Определить максимальное количество одновременных процессов maxProcesses:: IO QSem.QSem maxProcesses = QSem.newQSem 1 bkGrnd:: ШИО а -> ШИО (МВАР а) bkGrnd proc = do mvar <- liftIO newEmptyMVar _ <- liftIO $ forkIO $ do - Блокировать, пока не появятся свободные процессы sem <- maxProcesses QSem.waitQSem sem putStrLn "Старт" - Запустите команду оболочки результат <- шелли $ молча результат LiftIO $ putMVar mvar putStrLn "Готово" - Сигнал о том, что этот процесс завершен и может выполняться другой. QSem.signalQSem sem возврат мвар главная:: IO () главная = шелли $ молча $ до [img, файл] <- liftIO $ getArgs содержимое <- файл чтения $ из текста $ файл LT.pack - Запустите фоновый процесс для каждой строки ввода. результаты <- forM (содержимое LT.lines) $ \ line -> bkGrnd $ do runStdin <команда> <аргументы> результаты liftIO $ mapM_ takeMVar
3 ответа
Как я сказал в своем комментарии, каждый звонок bkGrnd
создает свой собственный семафон, позволяя каждому потоку продолжать работу без ожидания. Я бы попробовал что-то вроде этого, где семафор создается в main
и проходил каждый раз bkGrnd
,
bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
mvar <- liftIO newEmptyMVar
_ <- liftIO $ forkIO $ do
-- Block until there are free processes
QSem.waitQSem sem
--
-- code continues as before
--
main :: IO ()
main = shelly $ silently $ do
[img, file] <- liftIO $ getArgs
contents <- readfile $ fromText $ LT.pack file
sem <- maxProcesses
-- Run a backgrounded process for each line of input.
results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
runStdin <command> <arguments>
liftIO $ mapM_ takeMVar results
У вас есть ответ, но мне нужно добавить: QSem и QSemN не являются потокобезопасными, если возможна killThread или асинхронная смерть потока.
Мой отчет об ошибке и патч - это GHC trac ticket # 3160. Фиксированный код доступен в виде новой библиотеки под названием SafeSemaphore с модулем Control.Concurrent.MSem, MSemN, MSampleVar и бонусом FairRWLock.
Не лучше ли
bkGrnd sem proc = do
QSem.waitQSem sem
mvar <- liftIO newEmptyMVar
_ <- liftIO $ forkIO $ do
...
так даже не forkIO
пока вы не получите семафор?