Использование mapConcurrently для чтения стандартного ввода, выполнения HTTP-вызовов и параллельной записи в стандартный вывод

Я пишу программу, которая считывает несколько URL-адресов (по одному на строку) из стандартного ввода, слегка адаптирует их и выполняет HTTP-запросы для каждого из этих нескольких URL-адресов параллельно. Ответы выводятся на стандартный вывод. Вот код:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Monad
import Network.Wreq
import Control.Concurrent.MSem
import Control.Concurrent.Async
import Control.Concurrent (threadDelay)
import qualified Data.Traversable as T

main :: IO ()
main = void $ mapPool 4 (const processUrl) [1..]

mapPool :: T.Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do semaphore <- new max
                      mapConcurrently (with semaphore . f) xs

processUrl :: IO ()
processUrl = do param <- getLine
                response <- get (url ++ param)
                print response

url = "http://example.com/resources?param="

Параллелизм здесь жестко закодирован до четырех. Проблема возникает, когда некоторые из операций ввода-вывода (HTTP-запросов) в пакете не выполняются. Согласно Control.Concurrent.Async.mapConcurrentlyЕсли одно действие не выполнено, остальные отменяются. В моем случае кажется, что последний пакет всегда будет неудачным, потому что ввод достигает EOF, происходит исключение, и программа выводит:

my-program-exe: <stdin>: hGetLine: end of file

Есть ли альтернатива для mapConcurrently, которая не отменяет все остальные действия в случае, если одно заканчивается исключением? Если нет, есть ли лучший способ подойти к этому типу задач?

1 ответ

Есть ли альтернатива для mapConcurrently, которая не отменяет все остальные действия в случае, если одно заканчивается исключением?

Здесь исключение вполне предсказуемо, поэтому, возможно, нам следует решить проблему в источнике, например, проверять EOF перед чтением каждой строки. Мы могли бы поместить это в IO (Maybe String) действие, которое использовало Nothing чтобы обозначить EOF.

getLineMaybe :: IO (Maybe String)
getLineMaybe =
    do isEOF <- hIsEOF stdin
       if isEOF then return Nothing
                else Just <$> System.IO.getLine

В вашем примере есть проблема: одновременная запись в стандартный вывод может привести к искаженному результату. Процесс записи в stdout должен выполняться только из одного потока, и, возможно, также из stdin.

Возможно, мы могли бы иметь две (закрываемые и ограниченные) параллельные очереди, одну, в которую мы помещаем строки, считанные из stdin, и другую, в которую мы помещаем обработанные результаты, которые будут записаны позже. Соединение одного с другим будет иметь несколько рабочих потоков.

Использование пакетов async, stm и stm-chans

import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Concurrent.STM.TVar
import qualified Control.Concurrent.STM.TBMQueue as Q -- closeable, bounded queue

и эта вспомогательная функция

untilNothing :: IO (Maybe a) -> (a -> IO ()) -> IO () -> IO ()
untilNothing action handler finalizer =
   let go = do mx <- action
               case mx of
                   Nothing -> finalizer
                   Just x -> do handler x
                                go
    in go     

мы можем написать общую функцию, подобную следующей

data ConcConf = ConcConf {
                           pendingQueueSize :: Int,
                           doneQueueSize :: Int,
                           concurrencyLevel :: Int
                         } deriving Show

concPipeline :: ConcConf -> IO (Maybe a) -> (a -> IO b) -> (b -> IO ()) -> IO ()
concPipeline conf reader transformer writer =
    do src <- atomically $ Q.newTBMQueue (pendingQueueSize conf)
       dst <- atomically $ Q.newTBMQueue (doneQueueSize conf)
       workersLeft <- atomically $ newTVar (concurrencyLevel conf)
       let gang = replicateConcurrently_ (concurrencyLevel conf)
           pipeline =
               untilNothing reader
                            (\a -> atomically $ Q.writeTBMQueue src a)
                            (atomically $ Q.closeTBMQueue src)
               `concurrently_`
               untilNothing (atomically $ Q.readTBMQueue dst)
                            writer
                            (pure ())
               `concurrently_`
               -- worker threads connecting reader and writer
               gang (untilNothing (atomically $ Q.readTBMQueue src)
                                  (\a -> do b <- transformer a
                                            atomically $ Q.writeTBMQueue dst b)
                                  -- last one remaining closes shop
                                  (atomically $ do modifyTVar' workersLeft pred
                                                   c <- readTVar workersLeft
                                                   if c == 0 then Q.closeTBMQueue dst
                                                             else pure ()))
       pipeline
Другие вопросы по тегам