Перехват исключений в трубе без прерывания
Это похоже на длинный путь, но я написал канал для подключения к базе данных, получил список баз данных на сервере, подключился к каждой из них, затем выполнил запрос для каждой (количество пользователей), а затем распечатал эти на счет. К сожалению, это примерно столько, сколько я могу упростить это из моего реального примера. Я использую pipe-4.1.0, pipe-safe-2.0.2 и mysql-simple-0.2.2.4. Вот код:
{-# LANGUAGE RankNTypes, OverloadedStrings #-}
import Pipes
import qualified Pipes.Safe as PS
import qualified Pipes.Prelude as P
import Database.MySQL.Simple
import qualified Data.Text as T
import Control.Monad.Catch as MC
import Control.Monad (forever)
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
data DBName = DBName T.Text deriving Show
-- connect to a database and use a table.
mydb :: T.Text -> ConnectInfo
mydb = undefined
-- Quirk of (mysql|postgresql)-simple libraries
unOnly (Only a) = a
queryProducer :: (MonadIO m, QueryParams params, QueryResults r) => Connection -> Query -> params -> Pipes.Producer' r m ()
queryProducer = undefined
myDBNames :: (PS.MonadSafe m, MonadIO m) => Producer DBName m ()
myDBNames = PS.bracket (liftIO $ connect $ mydb "sometable") (liftIO . close) $ \db ->
queryProducer db "show databases" () >-> P.map (DBName . unOnly)
-- I realize this is inefficient, one step at a time.
connectToDB :: (PS.MonadSafe m, MonadIO m) => Pipe DBName Connection m ()
connectToDB = forever $ do
(DBName dbname) <- await
PS.bracket
(liftIO . connect . mydb $ dbname)
(liftIO . close)
yield
userCount :: (PS.MonadCatch m, MonadIO m) => Pipe Connection Int m ()
userCount = forever $ do
db <- await
queryProducer db "select count(*) from user" () >-> P.map unOnly
main :: IO ()
main = PS.runSafeT $ runEffect $ myDBNames >-> P.tee P.print >-> connectToDB >-> userCount >-> P.print
Это отлично работает. Однако, скажем, в одной из этих баз данных пользовательская таблица называется user, а не user, поэтому mysql-simple при выполнении этого запроса выдаст исключение. Я хочу перехватить эту ошибку и просто вернуть 0 пользователей для этих запросов, но продолжайте. Вещи, которые я пробовал:
(queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0))) >-> P.map unOnly
Это не работает Иногда он выдаст ошибку и выдаст 0, только чтобы немедленно завершиться при исключении. Я подумал, может быть, это потому, что я исключил queryProducer за исключением, и я должен вызвать его снова, поэтому я попробовал эту рекурсивную версию:
thequery db >-> P.map unOnly
where
thequery db = queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0) >> thequery db)
Но это также не удается. Однако иногда он на самом деле выполняет несколько запросов, несколько раз печатая сбой и получая несколько нулей, прежде чем снова завершиться с исключением. Я действительно не понимаю, почему это происходит.
В соответствии с асинхронной библиотекой, исключения должны отправляться в поток, в котором работает канал, поэтому не похоже, что это может быть проблемой потоков.
В случае, если реализация моего queryProducer имеет значение, он моделируется после функции запроса pipe-postgresql, обобщенной на Producer, поэтому я могу встроить ее в другие комбинаторы. Ниже mysql-simple в библиотеке mysql есть бросок, который выдает ошибку ConnectionError, если ваш sql не имеет смысла, который просачивается через эту функцию.
{-# LANGUAGE RankNTypes #-}
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Database.MySQL.Simple as My
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
import qualified Pipes
import qualified Pipes.Concurrent as Pipes
--------------------------------------------------------------------------------
-- | Convert a query to a 'Producer' of rows.
--
-- For example,
--
-- > pg <- connectToMysql
-- > query pg "SELECT * FROM widgets WHERE ID = ?" (Only widgetId) >-> print
--
-- Will select all widgets for a given @widgetId@, and then print each row to
-- standard output.
queryProducer
:: (MonadIO m, QueryResults r, QueryParams params)
=> My.Connection -> My.Query -> params -> Pipes.Producer' r m ()
queryProducer c q p = do
(o, i, seal) <- liftIO (Pipes.spawn' Pipes.Single)
worker <- liftIO $ Async.async $ do
My.fold c q p () (const $ void . STM.atomically . Pipes.send o)
STM.atomically seal
liftIO $ Async.link worker
Pipes.fromInput i
Я также попытался использовать EitherT, чтобы попытаться перехватить исключения, поскольку, похоже, так и было в прошлом в каналах. Но документация для этого в руководстве по трубам исчезла между 3 и 4, что заставляет меня задуматься о том, рекомендуется ли этот метод до сих пор или нет. К сожалению, я не смог заставить его работать, потому что, поскольку я использую queryProducer вместо единственного await/yields, я не уверен, как это структурировать.
1 ответ
Основываясь на комментарии Гейба, я исправил свою функцию queryProducer, убедившись, что запрос не может быть выполнен, пока не сработала функция ссылки.
query :: (MonadIO m, QueryResults r, QueryParams params) => My.Connection -> My.Query -> params -> Pipes.Producer' r m ()
query c q p = do
(o, i, seal) <- liftIO (Pipes.spawn' Pipes.Single)
mvar <- liftIO $ newEmptyMVar
worker <- liftIO $ Async.async $ do
takeMVar mvar
My.fold c q p () (const $ void . STM.atomically . Pipes.send o)
STM.atomically seal
liftIO $ Async.link worker
liftIO $ putMVar mvar ()
Pipes.fromInput i
Я проверил это, и это похоже на работу.