Перехват исключений в трубе без прерывания

Это похоже на длинный путь, но я написал канал для подключения к базе данных, получил список баз данных на сервере, подключился к каждой из них, затем выполнил запрос для каждой (количество пользователей), а затем распечатал эти на счет. К сожалению, это примерно столько, сколько я могу упростить это из моего реального примера. Я использую pipe-4.1.0, pipe-safe-2.0.2 и mysql-simple-0.2.2.4. Вот код:

{-# LANGUAGE RankNTypes, OverloadedStrings #-}

import Pipes
import qualified Pipes.Safe as PS
import qualified Pipes.Prelude as P
import Database.MySQL.Simple

import qualified Data.Text as T

import Control.Monad.Catch as MC

import Control.Monad (forever)

import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults

data DBName = DBName T.Text deriving Show

-- connect to a database and use a table.
mydb :: T.Text -> ConnectInfo
mydb = undefined

-- Quirk of (mysql|postgresql)-simple libraries
unOnly (Only a) = a

queryProducer :: (MonadIO m, QueryParams params, QueryResults r) => Connection -> Query -> params -> Pipes.Producer' r m ()
queryProducer = undefined

myDBNames :: (PS.MonadSafe m, MonadIO m) => Producer DBName m ()
myDBNames = PS.bracket (liftIO $ connect $ mydb "sometable") (liftIO . close) $ \db ->
  queryProducer db "show databases" () >-> P.map (DBName . unOnly)

-- I realize this is inefficient, one step at a time.
connectToDB :: (PS.MonadSafe m, MonadIO m) => Pipe DBName Connection m ()
connectToDB = forever $ do
      (DBName dbname) <- await
      PS.bracket
        (liftIO . connect . mydb $ dbname)
        (liftIO . close)
        yield

userCount :: (PS.MonadCatch m, MonadIO m) => Pipe Connection Int m ()
userCount = forever $ do
  db <- await
  queryProducer db "select count(*) from user" () >-> P.map unOnly

main :: IO ()
main = PS.runSafeT $ runEffect $ myDBNames >-> P.tee P.print >-> connectToDB >-> userCount >-> P.print

Это отлично работает. Однако, скажем, в одной из этих баз данных пользовательская таблица называется user, а не user, поэтому mysql-simple при выполнении этого запроса выдаст исключение. Я хочу перехватить эту ошибку и просто вернуть 0 пользователей для этих запросов, но продолжайте. Вещи, которые я пробовал:

(queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0))) >-> P.map unOnly 

Это не работает Иногда он выдаст ошибку и выдаст 0, только чтобы немедленно завершиться при исключении. Я подумал, может быть, это потому, что я исключил queryProducer за исключением, и я должен вызвать его снова, поэтому я попробовал эту рекурсивную версию:

thequery db >-> P.map unOnly
where
  thequery db = queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0) >> thequery db)

Но это также не удается. Однако иногда он на самом деле выполняет несколько запросов, несколько раз печатая сбой и получая несколько нулей, прежде чем снова завершиться с исключением. Я действительно не понимаю, почему это происходит.

В соответствии с асинхронной библиотекой, исключения должны отправляться в поток, в котором работает канал, поэтому не похоже, что это может быть проблемой потоков.

В случае, если реализация моего queryProducer имеет значение, он моделируется после функции запроса pipe-postgresql, обобщенной на Producer, поэтому я могу встроить ее в другие комбинаторы. Ниже mysql-simple в библиотеке mysql есть бросок, который выдает ошибку ConnectionError, если ваш sql не имеет смысла, который просачивается через эту функцию.

{-# LANGUAGE RankNTypes #-}

import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Database.MySQL.Simple as My
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
import qualified Pipes
import qualified Pipes.Concurrent as Pipes

--------------------------------------------------------------------------------
-- | Convert a query to a 'Producer' of rows.
--
-- For example,
--
-- > pg <- connectToMysql
-- > query pg "SELECT * FROM widgets WHERE ID = ?" (Only widgetId) >-> print
--
-- Will select all widgets for a given @widgetId@, and then print each row to
-- standard output.
queryProducer 
    :: (MonadIO m, QueryResults r, QueryParams params)
    => My.Connection -> My.Query -> params -> Pipes.Producer' r m ()
queryProducer c q p = do
    (o, i, seal) <- liftIO (Pipes.spawn' Pipes.Single)
    worker <- liftIO $ Async.async $ do
        My.fold c q p () (const $ void . STM.atomically . Pipes.send o)
        STM.atomically seal
    liftIO $ Async.link worker
    Pipes.fromInput i

Я также попытался использовать EitherT, чтобы попытаться перехватить исключения, поскольку, похоже, так и было в прошлом в каналах. Но документация для этого в руководстве по трубам исчезла между 3 и 4, что заставляет меня задуматься о том, рекомендуется ли этот метод до сих пор или нет. К сожалению, я не смог заставить его работать, потому что, поскольку я использую queryProducer вместо единственного await/yields, я не уверен, как это структурировать.

1 ответ

Основываясь на комментарии Гейба, я исправил свою функцию queryProducer, убедившись, что запрос не может быть выполнен, пока не сработала функция ссылки.

query :: (MonadIO m, QueryResults r, QueryParams params) => My.Connection -> My.Query -> params -> Pipes.Producer' r m ()
query c q p = do
    (o, i, seal) <- liftIO (Pipes.spawn' Pipes.Single)
    mvar <- liftIO $ newEmptyMVar
    worker <- liftIO $ Async.async $ do
        takeMVar mvar
        My.fold c q p () (const $ void . STM.atomically . Pipes.send o)
        STM.atomically seal
    liftIO $ Async.link worker
    liftIO $ putMVar mvar ()
    Pipes.fromInput i

Я проверил это, и это похоже на работу.

Другие вопросы по тегам