Эффективные битовые потоки в Haskell

В постоянном стремлении эффективно работать с битами (например, см. Этот вопрос SO) новейшей задачей является эффективная потоковая передача и потребление битов.

В качестве первой простой задачи я выбираю поиск самой длинной последовательности идентичных битов в потоке битов, генерируемом /dev/urandom, Типичное заклинание будет head -c 1000000 </dev/urandom | my-exe, Фактическая цель состоит в том, чтобы передавать биты и декодировать, например, гамма-код Elias, то есть коды, которые не являются кусками байтов или их кратными числами.

Для таких кодов переменной длины хорошо иметь take, takeWhile, group и т. д. язык для манипулирования списком. Так как BitStream.take будет фактически поглощать часть бистрима, возможно, в игру вступит какая-то монада.

Очевидной отправной точкой является ленивая строка из Data.ByteString.Lazy,

А. Подсчет байтов

Эта очень простая программа на Haskell работает наравне с программой на C, как и следовало ожидать.

import qualified Data.ByteString.Lazy as BSL

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ BSL.length bs

Б. Добавление байтов

Как только я начну использовать unpack все должно стать хуже.

main = do
    bs <- BSL.getContents
    print $ sum $ BSL.unpack bs

Удивительно, но Haskell и C показывают почти одинаковую производительность.

C. Самая длинная последовательность идентичных битов

В качестве первой нетривиальной задачи можно найти самую длинную последовательность идентичных битов, например:

module Main where

import           Data.Bits            (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import           Data.List            (group)
import           Data.Word8           (Word8)

splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ maximum $ length <$> (group $ bitStream bs)

Ленивая строка байтов преобразуется в список [Word8] а затем, используя смены, каждый Word делится на биты, в результате получается список [Bool], Этот список списков затем сводится с concat, Получив (ленивый) список Bool использовать group разбить список на последовательности идентичных битов и затем отобразить length над ним. в заключение maximum дает желаемый результат. Довольно просто, но не очень быстро:

# C
real    0m0.606s

# Haskell
real    0m6.062s

Эта наивная реализация ровно на порядок медленнее.

Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для анализа 1 МБ входных данных). Тем не менее, нет большой утечки в космосе.

Отсюда я начинаю ковыряться:

  • E сть bitstream пакет, который обещает: " Быстрые, упакованные, строгие потоки битов (т.е. список Bools) с полуавтоматическим слиянием потоков ". К сожалению, он не в курсе текущих vector пакет, смотрите здесь для деталей.
  • Далее я расследую streaming, Я не совсем понимаю, зачем мне нужна "эффективная" потоковая передача, которая привносит в игру некоторую монаду - по крайней мере, пока я не начну с обратной задачи, то есть кодирования и записи битовых потоков в файл.
  • Как насчет просто fold над ByteString? Я должен был бы ввести состояние, чтобы отслеживать потребляемые биты. Это не совсем приятно take, takeWhile, group и т. д. язык, который желательно.

И теперь я не совсем уверен, куда идти.

Обновление:

Я понял, как это сделать с streaming а также streaming-bytestring, Я, вероятно, не делаю это правильно, потому что результат катастрофически плох.

import           Data.Bits                 (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import           Data.Word8                (Word8)
import qualified Streaming                 as S
import           Streaming.Prelude         (Of, Stream)
import qualified Streaming.Prelude         as S

splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]

bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s

main :: IO ()
main = do
    let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
        gs = S.group $ bitStream bs ::  Stream (Stream (Of Bool) IO) IO ()
    maxLen <- S.maximum $ S.mapped S.length gs
    print $ S.fst' maxLen

Это проверит ваше терпение с чем-либо, кроме нескольких тысяч байтов ввода от стандартного ввода. Профилировщик говорит, что тратит безумное количество времени (квадратичное по размеру ввода) в Streaming.Internal.>>=.loop а также Data.Functor.Of.fmap, Я не совсем уверен, что первый, но fmap указывает (?), что жонглирование этих Of a b не приносит нам никакой пользы, и поскольку мы находимся в монаде IO, ее нельзя оптимизировать.

У меня также есть потоковый эквивалент байтового сумматора здесь: SumBytesStream.hs, что немного медленнее, чем простой ленивый ByteString реализация, но все же приличная. поскольку streaming-bytestring провозглашается, что " bytestring io сделано правильно ", я ожидал лучшего. Я, вероятно, не делаю это правильно, тогда.

В любом случае, все эти битовые вычисления не должны происходить в монаде ввода-вывода. Но BSS.getContents заставляет меня в IO монаду, потому что getContents :: MonadIO m => ByteString m () и нет выхода.

Обновление 2

Следуя совету @dfeuer, я использовал streaming пакет в мастер @HEAD. Вот результат.

longest-seq-c       0m0.747s    (C)
longest-seq         0m8.190s    (Haskell ByteString)
longest-seq-stream  0m13.946s   (Haskell streaming-bytestring)

Проблема O(n^2) Streaming.concat решена, но мы все еще не приближаемся к тесту C.

Обновление 3

Решение Cirdec обеспечивает производительность наравне с C. Используемая конструкция называется "Списки, закодированные Церковью", см. Этот ответ SO или Haskell Wiki для типов ранга N.

Исходные файлы:

Все исходные файлы можно найти на github. Makefile имеет все различные цели для запуска экспериментов и профилирования. По умолчанию make будет просто строить все (создать bin/ сначала каталог!) а потом make time будет делать выбор времени на longest-seq исполняемые файлы. Исполняемые файлы C получают -c добавлен, чтобы отличить их.

2 ответа

Решение

Промежуточные распределения и их соответствующие издержки могут быть удалены, когда операции над потоками сливаются воедино. Прелюдия GHC обеспечивает объединение / складывание сборок для ленивых потоков в форме правил перезаписи. Общая идея заключается в том, что если одна функция выдает результат, похожий на свёртку (она имеет тип (a -> b -> b) -> b -> b применительно к (:) а также []), а другая функция потребляет список, который выглядит как сворачивание, построение промежуточного списка можно удалить.

Для вашей проблемы я собираюсь построить нечто подобное, но используя строгие левые сгибы (foldl'вместо фолд. Вместо использования правил перезаписи, которые пытаются определить, когда что-то выглядит как foldlЯ буду использовать тип данных, который заставляет списки выглядеть как левые сгибы.

-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}

Поскольку я начал с того, что отказался от списков, мы заново реализуем часть прелюдии для списков.

Строгие левые складки могут быть созданы из foldl' функции как списков, так и байтов.

{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)

{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)

Простейший пример его использования - найти длину списка.

{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0

Мы также можем отобразить и объединить левые сгибы.

{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)

{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)

Для вашей проблемы нам нужно разбить слово на части.

{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte

И ByteString на кусочки

{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS

Чтобы найти самый длинный прогон, мы будем отслеживать предыдущее значение, длину текущего прогона и длину самого длинного прогона. Мы делаем поля строгими, чтобы строгость сгиба предотвращала накопление цепочек громких звёзд в памяти. Создание строгого типа данных для состояния - это простой способ получить контроль как над представлением в памяти, так и при оценке его полей.

data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
  where
    current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
 where
   (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)

И мы сделали

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun $ bitStream' bs

Это намного быстрее, но не совсем производительность c.

longest-seq-c       0m00.12s    (C)
longest-seq         0m08.65s    (Haskell ByteString)
longest-seq-fuse    0m00.81s    (Haskell ByteString fused)

Программа выделяет около 1 Мб для чтения 1000000 байт из ввода.

total alloc =   1,173,104 bytes  (excludes profiling overheads)

Обновленный код GitHub

Я нашел другое решение наравне с С. Data.Vector.Fusion.Stream.Monadic имеет потоковую реализацию, основанную на этой статье Coutts, Leshchinskiy, Stewart 2007. Идея, стоящая за этим, состоит в том, чтобы использовать слияние потоков destroy / unoldr.

Вспомните этот список :: (b -> Maybe (a, b)) -> b -> [a] создает список путем многократного применения (разворачивания) функции шага вперед, начиная с начального значения. Stream это просто unfoldr функция с начальным состоянием. (The Data.Vector.Fusion.Stream.Monadic библиотека использует GADT для создания конструкторов для Step это может быть сопоставлено с шаблоном удобно. Думаю, это можно было бы сделать и без ГАДЦ.)

Центральным элементом решения является mkBitstream :: BSL.ByteString -> Stream Bool функция, которая превращает BytesString в поток Bool, В основном, мы отслеживаем текущий ByteString, текущий байт, и сколько текущего байта все еще не используется. Всякий раз, когда байт используется, другой байт отрубается ByteString, когда Nothing осталось, поток Done,

longestRun функция берется прямо из решения @Cirdec.

Вот этюд:

{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where

import           Control.Monad.Identity            (Identity)
import           Data.Bits                         (shiftR, (.&.))
import qualified Data.ByteString.Lazy              as BSL
import           Data.Functor.Identity             (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import           Data.Word8                        (Word8)

type Stream a = S.Stream Identity a   -- no need for any monad, really

data Step = Step BSL.ByteString !Word8 !Word8   -- could use tuples, but this is faster

mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
    {-# INLINE_INNER step #-}
    step (Step bs w n) | n==0 = case (BSL.uncons bs) of
                            Nothing        -> return S.Done
                            Just (w', bs') -> return $ 
                                S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
                       | otherwise = return $ 
                                S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))


data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x  = LongestRun x current (max current longest)
    where current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
    (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
    return longest

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun (mkBitstream bs)
Другие вопросы по тегам