Эффективная потоковая передача и манипулирование потоком байтов в Haskell

При написании десериализатора для большого (<bloblength><blob>)* закодированный бинарный файл Я застрял с различными библиотеками производства-преобразования-потребления на Haskell. На данный момент мне известны четыре потоковые библиотеки:

  • Data.Conduit: широко используется, имеет очень бережное управление ресурсами
  • Трубы: похожие на conduit ( Haskell Cast # 6 приятно раскрывает различия между conduit а также pipes)
  • Data.Binary.Get: предлагает полезные функции, такие как getWord32be, но пример потоковой передачи неудобен
  • System.IO.Streams: кажется, самый простой в использовании

Вот урезанный пример того, где что-то идет не так, когда я пытаюсь сделать Word32 потоковое с conduit, Немного более реалистичный emample сначала прочитал бы Word32 который определяет длину сгустка, а затем дает ленивый ByteString той длины (которая затем десериализуется далее). Но здесь я просто пытаюсь извлечь Word32 потоковым способом из двоичного файла:

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified Data.Conduit.List            as CL
import           Data.Word                    (Word32)
import           System.Environment           (getArgs)

-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
    G.runGet G.getWord32be $ BL.fromStrict bs

-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
    mbs <- await
    case mbs of
        Just bs -> do
            case C.null bs of
                False -> do
                    yield $ getWord32 bs
                    leftover $ BS.drop 4 bs
                    transform
                True -> return ()
        Nothing -> return ()

main :: IO ()
main = do
    filename <- fmap (!!0) getArgs  -- should check length getArgs
    result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
    print $ length result   -- is always 8188 for files larger than 32752 bytes

Вывод программы - это просто количество прочитанных Word32. Оказывается, поток завершается после чтения первого фрагмента (около 32 КБ). По какой-то причине mbs никогда Nothingтак что я должен проверить null bs который останавливает поток, когда чанк потребляется. Очевидно, мой канал transform неисправен. Я вижу два пути к решению:

  1. await не хочет идти на второй кусок ByteStreamТак есть ли другая функция, которая тянет следующий кусок? В примерах, которые я видел (например, Conduit 101), это не так, как это делается
  2. Это просто неправильный способ настройки transform,

Как это сделано правильно? Это правильный путь? (Производительность имеет значение.)

Обновление: вот плохой способ сделать это с помощью Systems.IO.Streams:

module Main where

import           Data.Word                (Word32)
import           System.Environment       (getArgs)
import           System.IO                (IOMode (ReadMode), openFile)
import qualified System.IO.Streams        as S
import           System.IO.Streams.Binary (binaryInputStream)
import           System.IO.Streams.List   (outputToList)

main :: IO ()
main = do
    filename : _ <- getArgs
    h <- openFile filename ReadMode
    s <- S.handleToInputStream h
    i <- binaryInputStream s :: IO (S.InputStream Word32)
    r <- outputToList $ S.connect i
    print $ last r

"Плохой" означает: очень требовательный во времени и пространстве, не обрабатывает исключения декодирования.

3 ответа

Ваша непосредственная проблема вызвана тем, как вы используете leftover, Эта функция используется для "Предоставления единственного фрагмента оставшегося ввода для использования следующим компонентом в текущей монадической привязке", и поэтому, когда вы даете ему bs перед циклом с transform вы фактически выбрасываете остальную часть строки (то есть, что после bs).

Правильное решение на основе вашего кода будет использовать интерфейс добавочного ввода Data.Binary.Get заменить ваш yield / leftover комбинация с чем-то, что потребляет каждый кусок полностью. Более прагматичный подход, тем не менее, использует пакет бинарного канала, который обеспечивает это в форме conduitGet (его источник дает хорошее представление о том, как будет выглядеть "ручная" реализация):

import           Data.Conduit.Serialization.Binary

-- etc.

transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be

Одно предостережение заключается в том, что это приведет к ошибке разбора, если общее число байтов не кратно 4 (т.е. последний Word32 является неполным). В маловероятном случае, если это не то, что вы хотите, ленивый выход будет просто использовать \bs -> C.take (4 * truncate (C.length bs / 4)) bs на входной строке.

С pipes (а также pipes-group а также pipes-bytestring) демо проблема сводится к комбинаторам. Сначала мы разрешаем входящий недифференцированный поток байтов в маленькие 4-байтовые блоки:

chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n) 

Затем мы сопоставляем их с Word32и (здесь) подсчитать их.

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
     print n

Это потерпит неудачу, если у нас меньше 4 байтов или иначе не удастся проанализировать, но мы также можем отобразить с

getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case  G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
  Left r -> Nothing
  Right (_, off, w32) -> Just w32

Следующая программа затем распечатает парсеты для действительных 4-байтовых последовательностей

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ chunksOfStrict 4 (Bytes.fromHandle h) 
                 >-> P.map getMaybeWord32
                 >-> P.concat  -- here `concat` eliminates maybes
                 >-> P.print 

Конечно, есть и другие способы обработки неудачных разборов.

Здесь, однако, кое-что ближе к программе, которую вы просили. Требуется четырехбайтовый сегмент из байтового потока (Producer ByteString m r) и читает это как Word32 если это достаточно долго; затем он берет столько входных байтов и накапливает их в ленивую строку байтов, что приводит к получению. Это просто повторяется до тех пор, пока не закончится байтов. В main ниже я печатаю каждую полученную ленивую строку, которая получается:

module Main (main) where 
import Pipes 
import qualified Pipes.Prelude as P
import Pipes.Group (folds) 
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL 
import System.Environment ( getArgs )

splitLazy :: (Monad m, Integral n) =>
   n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
  (bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
  return (BL.fromChunks bss, rest)

measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
 (lbs, rest) <- lift $ splitLazy 4 bs
 if BL.length lbs /= 4
   then rest >-> P.drain -- in fact it will be empty
   else do
     let w32 = G.runGet G.getWord32be lbs
     (lbs', rest') <- lift $ splitLazy w32 bs
     yield lbs
     measureChunks rest

main :: IO ()
main = do
  filename:_ <- getArgs
  IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print

Это снова грубо в том смысле, что он использует runGet не runGetOrFail, но это легко исправить. Стандартная процедура конвейера состояла бы в том, чтобы остановить преобразование потока при неудачном разборе и вернуть непарсированный тестовый поток.

Если вы ожидали, что Word32s были для больших чисел, так что вы не хотели накапливать соответствующий поток байтов в виде отложенной строки байтов, но, скажем, записывать их в разные файлы без накопления, мы могли бы довольно легко изменить программу, чтобы сделать это. Это потребует сложного использования канала, но является предпочтительным подходом с pipes а также streaming,

Вот относительно простое решение, которое я хочу бросить на ринг. Это повторное использование splitAt завернутый в State монада, которая дает интерфейс, идентичный (подмножество) Data.Binary.Get, Результирующий [ByteString] получается в main с whileJust над getBlob,

module Main (main) where

import           Control.Monad.Loops
import           Control.Monad.State
import qualified Data.Binary.Get      as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import           Data.Int             (Int64)
import           Data.Word            (Word32)
import           System.Environment   (getArgs)

-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString

getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
    let (w, rest) = BL.splitAt 4 bs
    case BL.length w of
        4 -> (Just w', rest) where
            w' = G.runGet G.getWord32be w
        _ -> (Nothing, BL.empty)

getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs

getBlob :: Get (Maybe BL.ByteString)
getBlob = do
    ml <- getWord32be
    case ml of
        Nothing -> return Nothing
        Just l -> do
            blob <- getLazyByteString (fromIntegral l :: Int64)
            return $ Just blob

runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs

main :: IO ()
main = do
    fname <- head <$> getArgs
    bs <- BL.readFile fname
    let ls = runGet loop bs where
        loop = whileJust getBlob return
    print $ length ls

Там нет обработки ошибок в getBlob, но это легко расширить. Пространственно-временная сложность достаточно хороша, если только полученный список используется осторожно. (Сценарий Python, который создает некоторые случайные данные для потребления вышеупомянутым здесь).

Другие вопросы по тегам