Эффективная потоковая передача и манипулирование потоком байтов в Haskell
При написании десериализатора для большого (<bloblength><blob>)*
закодированный бинарный файл Я застрял с различными библиотеками производства-преобразования-потребления на Haskell. На данный момент мне известны четыре потоковые библиотеки:
- Data.Conduit: широко используется, имеет очень бережное управление ресурсами
- Трубы: похожие на
conduit
( Haskell Cast # 6 приятно раскрывает различия междуconduit
а такжеpipes
) - Data.Binary.Get: предлагает полезные функции, такие как getWord32be, но пример потоковой передачи неудобен
- System.IO.Streams: кажется, самый простой в использовании
Вот урезанный пример того, где что-то идет не так, когда я пытаюсь сделать Word32
потоковое с conduit
, Немного более реалистичный emample сначала прочитал бы Word32
который определяет длину сгустка, а затем дает ленивый ByteString
той длины (которая затем десериализуется далее). Но здесь я просто пытаюсь извлечь Word32 потоковым способом из двоичного файла:
module Main where
-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary
import Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get as G
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as BL
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Word (Word32)
import System.Environment (getArgs)
-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
G.runGet G.getWord32be $ BL.fromStrict bs
-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
mbs <- await
case mbs of
Just bs -> do
case C.null bs of
False -> do
yield $ getWord32 bs
leftover $ BS.drop 4 bs
transform
True -> return ()
Nothing -> return ()
main :: IO ()
main = do
filename <- fmap (!!0) getArgs -- should check length getArgs
result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
print $ length result -- is always 8188 for files larger than 32752 bytes
Вывод программы - это просто количество прочитанных Word32. Оказывается, поток завершается после чтения первого фрагмента (около 32 КБ). По какой-то причине mbs
никогда Nothing
так что я должен проверить null bs
который останавливает поток, когда чанк потребляется. Очевидно, мой канал transform
неисправен. Я вижу два пути к решению:
await
не хочет идти на второй кусокByteStream
Так есть ли другая функция, которая тянет следующий кусок? В примерах, которые я видел (например, Conduit 101), это не так, как это делается- Это просто неправильный способ настройки
transform
,
Как это сделано правильно? Это правильный путь? (Производительность имеет значение.)
Обновление: вот плохой способ сделать это с помощью Systems.IO.Streams
:
module Main where
import Data.Word (Word32)
import System.Environment (getArgs)
import System.IO (IOMode (ReadMode), openFile)
import qualified System.IO.Streams as S
import System.IO.Streams.Binary (binaryInputStream)
import System.IO.Streams.List (outputToList)
main :: IO ()
main = do
filename : _ <- getArgs
h <- openFile filename ReadMode
s <- S.handleToInputStream h
i <- binaryInputStream s :: IO (S.InputStream Word32)
r <- outputToList $ S.connect i
print $ last r
"Плохой" означает: очень требовательный во времени и пространстве, не обрабатывает исключения декодирования.
3 ответа
Ваша непосредственная проблема вызвана тем, как вы используете leftover
, Эта функция используется для "Предоставления единственного фрагмента оставшегося ввода для использования следующим компонентом в текущей монадической привязке", и поэтому, когда вы даете ему bs
перед циклом с transform
вы фактически выбрасываете остальную часть строки (то есть, что после bs
).
Правильное решение на основе вашего кода будет использовать интерфейс добавочного ввода Data.Binary.Get
заменить ваш yield
/ leftover
комбинация с чем-то, что потребляет каждый кусок полностью. Более прагматичный подход, тем не менее, использует пакет бинарного канала, который обеспечивает это в форме conduitGet
(его источник дает хорошее представление о том, как будет выглядеть "ручная" реализация):
import Data.Conduit.Serialization.Binary
-- etc.
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be
Одно предостережение заключается в том, что это приведет к ошибке разбора, если общее число байтов не кратно 4 (т.е. последний Word32
является неполным). В маловероятном случае, если это не то, что вы хотите, ленивый выход будет просто использовать \bs -> C.take (4 * truncate (C.length bs / 4)) bs
на входной строке.
С pipes
(а также pipes-group
а также pipes-bytestring
) демо проблема сводится к комбинаторам. Сначала мы разрешаем входящий недифференцированный поток байтов в маленькие 4-байтовые блоки:
chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n)
Затем мы сопоставляем их с Word32
и (здесь) подсчитать их.
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
print n
Это потерпит неудачу, если у нас меньше 4 байтов или иначе не удастся проанализировать, но мы также можем отобразить с
getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
Left r -> Nothing
Right (_, off, w32) -> Just w32
Следующая программа затем распечатает парсеты для действительных 4-байтовых последовательностей
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ chunksOfStrict 4 (Bytes.fromHandle h)
>-> P.map getMaybeWord32
>-> P.concat -- here `concat` eliminates maybes
>-> P.print
Конечно, есть и другие способы обработки неудачных разборов.
Здесь, однако, кое-что ближе к программе, которую вы просили. Требуется четырехбайтовый сегмент из байтового потока (Producer ByteString m r
) и читает это как Word32
если это достаточно долго; затем он берет столько входных байтов и накапливает их в ленивую строку байтов, что приводит к получению. Это просто повторяется до тех пор, пока не закончится байтов. В main
ниже я печатаю каждую полученную ленивую строку, которая получается:
module Main (main) where
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Group (folds)
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL
import System.Environment ( getArgs )
splitLazy :: (Monad m, Integral n) =>
n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
(bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
return (BL.fromChunks bss, rest)
measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
(lbs, rest) <- lift $ splitLazy 4 bs
if BL.length lbs /= 4
then rest >-> P.drain -- in fact it will be empty
else do
let w32 = G.runGet G.getWord32be lbs
(lbs', rest') <- lift $ splitLazy w32 bs
yield lbs
measureChunks rest
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print
Это снова грубо в том смысле, что он использует runGet
не runGetOrFail
, но это легко исправить. Стандартная процедура конвейера состояла бы в том, чтобы остановить преобразование потока при неудачном разборе и вернуть непарсированный тестовый поток.
Если вы ожидали, что Word32s
были для больших чисел, так что вы не хотели накапливать соответствующий поток байтов в виде отложенной строки байтов, но, скажем, записывать их в разные файлы без накопления, мы могли бы довольно легко изменить программу, чтобы сделать это. Это потребует сложного использования канала, но является предпочтительным подходом с pipes
а также streaming
,
Вот относительно простое решение, которое я хочу бросить на ринг. Это повторное использование splitAt
завернутый в State
монада, которая дает интерфейс, идентичный (подмножество) Data.Binary.Get
, Результирующий [ByteString]
получается в main
с whileJust
над getBlob
,
module Main (main) where
import Control.Monad.Loops
import Control.Monad.State
import qualified Data.Binary.Get as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Word (Word32)
import System.Environment (getArgs)
-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString
getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
let (w, rest) = BL.splitAt 4 bs
case BL.length w of
4 -> (Just w', rest) where
w' = G.runGet G.getWord32be w
_ -> (Nothing, BL.empty)
getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs
getBlob :: Get (Maybe BL.ByteString)
getBlob = do
ml <- getWord32be
case ml of
Nothing -> return Nothing
Just l -> do
blob <- getLazyByteString (fromIntegral l :: Int64)
return $ Just blob
runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs
main :: IO ()
main = do
fname <- head <$> getArgs
bs <- BL.readFile fname
let ls = runGet loop bs where
loop = whileJust getBlob return
print $ length ls
Там нет обработки ошибок в getBlob
, но это легко расширить. Пространственно-временная сложность достаточно хороша, если только полученный список используется осторожно. (Сценарий Python, который создает некоторые случайные данные для потребления вышеупомянутым здесь).