Использование mapConcurrently для чтения стандартного ввода, выполнения HTTP-вызовов и параллельной записи в стандартный вывод
Я пишу программу, которая считывает несколько URL-адресов (по одному на строку) из стандартного ввода, слегка адаптирует их и выполняет HTTP-запросы для каждого из этих нескольких URL-адресов параллельно. Ответы выводятся на стандартный вывод. Вот код:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Monad
import Network.Wreq
import Control.Concurrent.MSem
import Control.Concurrent.Async
import Control.Concurrent (threadDelay)
import qualified Data.Traversable as T
main :: IO ()
main = void $ mapPool 4 (const processUrl) [1..]
mapPool :: T.Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do semaphore <- new max
mapConcurrently (with semaphore . f) xs
processUrl :: IO ()
processUrl = do param <- getLine
response <- get (url ++ param)
print response
url = "http://example.com/resources?param="
Параллелизм здесь жестко закодирован до четырех. Проблема возникает, когда некоторые из операций ввода-вывода (HTTP-запросов) в пакете не выполняются. Согласно Control.Concurrent.Async.mapConcurrently
Если одно действие не выполнено, остальные отменяются. В моем случае кажется, что последний пакет всегда будет неудачным, потому что ввод достигает EOF, происходит исключение, и программа выводит:
my-program-exe: <stdin>: hGetLine: end of file
Есть ли альтернатива для mapConcurrently, которая не отменяет все остальные действия в случае, если одно заканчивается исключением? Если нет, есть ли лучший способ подойти к этому типу задач?
1 ответ
Есть ли альтернатива для mapConcurrently, которая не отменяет все остальные действия в случае, если одно заканчивается исключением?
Здесь исключение вполне предсказуемо, поэтому, возможно, нам следует решить проблему в источнике, например, проверять EOF перед чтением каждой строки. Мы могли бы поместить это в IO (Maybe String)
действие, которое использовало Nothing
чтобы обозначить EOF.
getLineMaybe :: IO (Maybe String)
getLineMaybe =
do isEOF <- hIsEOF stdin
if isEOF then return Nothing
else Just <$> System.IO.getLine
В вашем примере есть проблема: одновременная запись в стандартный вывод может привести к искаженному результату. Процесс записи в stdout должен выполняться только из одного потока, и, возможно, также из stdin.
Возможно, мы могли бы иметь две (закрываемые и ограниченные) параллельные очереди, одну, в которую мы помещаем строки, считанные из stdin, и другую, в которую мы помещаем обработанные результаты, которые будут записаны позже. Соединение одного с другим будет иметь несколько рабочих потоков.
Использование пакетов async, stm и stm-chans
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import qualified Control.Concurrent.STM.TBMQueue as Q -- closeable, bounded queue
и эта вспомогательная функция
untilNothing :: IO (Maybe a) -> (a -> IO ()) -> IO () -> IO ()
untilNothing action handler finalizer =
let go = do mx <- action
case mx of
Nothing -> finalizer
Just x -> do handler x
go
in go
мы можем написать общую функцию, подобную следующей
data ConcConf = ConcConf {
pendingQueueSize :: Int,
doneQueueSize :: Int,
concurrencyLevel :: Int
} deriving Show
concPipeline :: ConcConf -> IO (Maybe a) -> (a -> IO b) -> (b -> IO ()) -> IO ()
concPipeline conf reader transformer writer =
do src <- atomically $ Q.newTBMQueue (pendingQueueSize conf)
dst <- atomically $ Q.newTBMQueue (doneQueueSize conf)
workersLeft <- atomically $ newTVar (concurrencyLevel conf)
let gang = replicateConcurrently_ (concurrencyLevel conf)
pipeline =
untilNothing reader
(\a -> atomically $ Q.writeTBMQueue src a)
(atomically $ Q.closeTBMQueue src)
`concurrently_`
untilNothing (atomically $ Q.readTBMQueue dst)
writer
(pure ())
`concurrently_`
-- worker threads connecting reader and writer
gang (untilNothing (atomically $ Q.readTBMQueue src)
(\a -> do b <- transformer a
atomically $ Q.writeTBMQueue dst b)
-- last one remaining closes shop
(atomically $ do modifyTVar' workersLeft pred
c <- readTVar workersLeft
if c == 0 then Q.closeTBMQueue dst
else pure ()))
pipeline