Бесконечное чтение из файла

Я пытаюсь прочитать нерегулярный ввод (например, команды, которые могут появляться время от времени) из файла. Например, исходный файл пуст, и моя программа была запущена. Затем к файлу была добавлена ​​некоторая строка, и моя программа должна прочитать эту строку.

Первая наивная реализация:

import System.IO
import Control.Monad

listen :: Handle -> IO ()
listen file = forever $ do
    ineof <- hIsEOF file
    if ineof
        then do
            s <- hGetLine file
            putStrLn s
        else
            return ()

Но это, конечно, не работает должным образом (в первую очередь из-за проблем с производительностью). Как я могу реализовать это правильно (возможно, с использованием каналов)?

1 ответ

Решение

Я собрал пример реализации этого ниже. Основная идея:

  • Отслеживайте изменения файлов с помощью пакета fsnotify.
  • использование sourceFileRange для потоковой передачи ранее не использованных частей файла.
  • Используйте MVar чтобы сигнал обратного вызова fsnotify Source продолжить чтение.

Это предполагает, что исходный файл только добавляется, никогда не удаляется или сокращается.

import           Control.Concurrent        (forkIO, threadDelay)
import           Control.Concurrent.MVar   (MVar, newEmptyMVar, putMVar,
                                            takeMVar)
import           Control.Exception         (IOException, try)
import           Control.Monad             (forever, void, when)
import           Control.Monad.IO.Class    (liftIO)
import           Data.ByteString           (ByteString)
import qualified Data.ByteString           as S
import           Data.Conduit              (MonadResource, Source, bracketP,
                                            runResourceT, ($$), ($=))
import           Data.Conduit.Binary       (sourceFileRange)
import qualified Data.Conduit.List         as CL
import           Data.IORef                (IORef, modifyIORef, newIORef,
                                            readIORef)
import           Data.Time                 (getCurrentTime)
import           Filesystem                (canonicalizePath)
import           Filesystem.Path.CurrentOS (decodeString, directory)
import           System.FSNotify           (Event (..), startManager,
                                            stopManager, watchDir)

tryIO :: IO a -> IO (Either IOException a)
tryIO = try

sourceFileForever :: MonadResource m => FilePath -> Source m ByteString
sourceFileForever fp' = bracketP startManager stopManager $ \manager -> do
    fp <- liftIO $ canonicalizePath $ decodeString fp'
    baton <- liftIO newEmptyMVar
    liftIO $ watchDir manager (directory fp) (const True) $ \event -> void $ tryIO $ do
        fpE <- canonicalizePath $
            case event of
                Added x _ -> x
                Modified x _ -> x
                Removed x _ -> x
        when (fpE == fp) $ putMVar baton ()
    consumedRef <- liftIO $ newIORef 0
    loop baton consumedRef
  where
    loop :: MonadResource m => MVar () -> IORef Integer -> Source m ByteString
    loop baton consumedRef = forever $ do
        consumed <- liftIO $ readIORef consumedRef
        sourceFileRange fp' (Just consumed) Nothing $= CL.iterM counter
        liftIO $ takeMVar baton
      where
        counter bs = liftIO $ modifyIORef consumedRef (+ fromIntegral (S.length bs))

main :: IO ()
main = do
    let fp = "foo.txt"
    writeFile fp "Hello World!"
    _ <- forkIO $ runResourceT $ sourceFileForever fp $$ CL.mapM_ (liftIO . print)
    forever $ do
        now <- getCurrentTime
        appendFile fp $ show now ++ "\n"
        threadDelay 1000000
Другие вопросы по тегам