Бесконечное чтение из файла
Я пытаюсь прочитать нерегулярный ввод (например, команды, которые могут появляться время от времени) из файла. Например, исходный файл пуст, и моя программа была запущена. Затем к файлу была добавлена некоторая строка, и моя программа должна прочитать эту строку.
Первая наивная реализация:
import System.IO
import Control.Monad
listen :: Handle -> IO ()
listen file = forever $ do
ineof <- hIsEOF file
if ineof
then do
s <- hGetLine file
putStrLn s
else
return ()
Но это, конечно, не работает должным образом (в первую очередь из-за проблем с производительностью). Как я могу реализовать это правильно (возможно, с использованием каналов)?
1 ответ
Решение
Я собрал пример реализации этого ниже. Основная идея:
- Отслеживайте изменения файлов с помощью пакета fsnotify.
- использование
sourceFileRange
для потоковой передачи ранее не использованных частей файла. - Используйте
MVar
чтобы сигнал обратного вызова fsnotifySource
продолжить чтение.
Это предполагает, что исходный файл только добавляется, никогда не удаляется или сокращается.
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar,
takeMVar)
import Control.Exception (IOException, try)
import Control.Monad (forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Data.Conduit (MonadResource, Source, bracketP,
runResourceT, ($$), ($=))
import Data.Conduit.Binary (sourceFileRange)
import qualified Data.Conduit.List as CL
import Data.IORef (IORef, modifyIORef, newIORef,
readIORef)
import Data.Time (getCurrentTime)
import Filesystem (canonicalizePath)
import Filesystem.Path.CurrentOS (decodeString, directory)
import System.FSNotify (Event (..), startManager,
stopManager, watchDir)
tryIO :: IO a -> IO (Either IOException a)
tryIO = try
sourceFileForever :: MonadResource m => FilePath -> Source m ByteString
sourceFileForever fp' = bracketP startManager stopManager $ \manager -> do
fp <- liftIO $ canonicalizePath $ decodeString fp'
baton <- liftIO newEmptyMVar
liftIO $ watchDir manager (directory fp) (const True) $ \event -> void $ tryIO $ do
fpE <- canonicalizePath $
case event of
Added x _ -> x
Modified x _ -> x
Removed x _ -> x
when (fpE == fp) $ putMVar baton ()
consumedRef <- liftIO $ newIORef 0
loop baton consumedRef
where
loop :: MonadResource m => MVar () -> IORef Integer -> Source m ByteString
loop baton consumedRef = forever $ do
consumed <- liftIO $ readIORef consumedRef
sourceFileRange fp' (Just consumed) Nothing $= CL.iterM counter
liftIO $ takeMVar baton
where
counter bs = liftIO $ modifyIORef consumedRef (+ fromIntegral (S.length bs))
main :: IO ()
main = do
let fp = "foo.txt"
writeFile fp "Hello World!"
_ <- forkIO $ runResourceT $ sourceFileForever fp $$ CL.mapM_ (liftIO . print)
forever $ do
now <- getCurrentTime
appendFile fp $ show now ++ "\n"
threadDelay 1000000