Эффективные битовые потоки в Haskell
В постоянном стремлении эффективно работать с битами (например, см. Этот вопрос SO) новейшей задачей является эффективная потоковая передача и потребление битов.
В качестве первой простой задачи я выбираю поиск самой длинной последовательности идентичных битов в потоке битов, генерируемом /dev/urandom
, Типичное заклинание будет head -c 1000000 </dev/urandom | my-exe
, Фактическая цель состоит в том, чтобы передавать биты и декодировать, например, гамма-код Elias, то есть коды, которые не являются кусками байтов или их кратными числами.
Для таких кодов переменной длины хорошо иметь take
, takeWhile
, group
и т. д. язык для манипулирования списком. Так как BitStream.take
будет фактически поглощать часть бистрима, возможно, в игру вступит какая-то монада.
Очевидной отправной точкой является ленивая строка из Data.ByteString.Lazy
,
А. Подсчет байтов
Эта очень простая программа на Haskell работает наравне с программой на C, как и следовало ожидать.
import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs
Б. Добавление байтов
Как только я начну использовать unpack
все должно стать хуже.
main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs
Удивительно, но Haskell и C показывают почти одинаковую производительность.
C. Самая длинная последовательность идентичных битов
В качестве первой нетривиальной задачи можно найти самую длинную последовательность идентичных битов, например:
module Main where
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.List (group)
import Data.Word8 (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)
Ленивая строка байтов преобразуется в список [Word8]
а затем, используя смены, каждый Word
делится на биты, в результате получается список [Bool]
, Этот список списков затем сводится с concat
, Получив (ленивый) список Bool
использовать group
разбить список на последовательности идентичных битов и затем отобразить length
над ним. в заключение maximum
дает желаемый результат. Довольно просто, но не очень быстро:
# C
real 0m0.606s
# Haskell
real 0m6.062s
Эта наивная реализация ровно на порядок медленнее.
Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для анализа 1 МБ входных данных). Тем не менее, нет большой утечки в космосе.
Отсюда я начинаю ковыряться:
- E сть
bitstream
пакет, который обещает: " Быстрые, упакованные, строгие потоки битов (т.е. список Bools) с полуавтоматическим слиянием потоков ". К сожалению, он не в курсе текущихvector
пакет, смотрите здесь для деталей. - Далее я расследую
streaming
, Я не совсем понимаю, зачем мне нужна "эффективная" потоковая передача, которая привносит в игру некоторую монаду - по крайней мере, пока я не начну с обратной задачи, то есть кодирования и записи битовых потоков в файл. - Как насчет просто
fold
надByteString
? Я должен был бы ввести состояние, чтобы отслеживать потребляемые биты. Это не совсем приятноtake
,takeWhile
,group
и т. д. язык, который желательно.
И теперь я не совсем уверен, куда идти.
Обновление:
Я понял, как это сделать с streaming
а также streaming-bytestring
, Я, вероятно, не делаю это правильно, потому что результат катастрофически плох.
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import Data.Word8 (Word8)
import qualified Streaming as S
import Streaming.Prelude (Of, Stream)
import qualified Streaming.Prelude as S
splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs :: Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen
Это проверит ваше терпение с чем-либо, кроме нескольких тысяч байтов ввода от стандартного ввода. Профилировщик говорит, что тратит безумное количество времени (квадратичное по размеру ввода) в Streaming.Internal.>>=.loop
а также Data.Functor.Of.fmap
, Я не совсем уверен, что первый, но fmap
указывает (?), что жонглирование этих Of a b
не приносит нам никакой пользы, и поскольку мы находимся в монаде IO, ее нельзя оптимизировать.
У меня также есть потоковый эквивалент байтового сумматора здесь: SumBytesStream.hs
, что немного медленнее, чем простой ленивый ByteString
реализация, но все же приличная. поскольку streaming-bytestring
провозглашается, что " bytestring io сделано правильно ", я ожидал лучшего. Я, вероятно, не делаю это правильно, тогда.
В любом случае, все эти битовые вычисления не должны происходить в монаде ввода-вывода. Но BSS.getContents
заставляет меня в IO монаду, потому что getContents :: MonadIO m => ByteString m ()
и нет выхода.
Обновление 2
Следуя совету @dfeuer, я использовал streaming
пакет в мастер @HEAD. Вот результат.
longest-seq-c 0m0.747s (C)
longest-seq 0m8.190s (Haskell ByteString)
longest-seq-stream 0m13.946s (Haskell streaming-bytestring)
Проблема O(n^2) Streaming.concat
решена, но мы все еще не приближаемся к тесту C.
Обновление 3
Решение Cirdec обеспечивает производительность наравне с C. Используемая конструкция называется "Списки, закодированные Церковью", см. Этот ответ SO или Haskell Wiki для типов ранга N.
Исходные файлы:
Все исходные файлы можно найти на github. Makefile
имеет все различные цели для запуска экспериментов и профилирования. По умолчанию make
будет просто строить все (создать bin/
сначала каталог!) а потом make time
будет делать выбор времени на longest-seq
исполняемые файлы. Исполняемые файлы C получают -c
добавлен, чтобы отличить их.
2 ответа
Промежуточные распределения и их соответствующие издержки могут быть удалены, когда операции над потоками сливаются воедино. Прелюдия GHC обеспечивает объединение / складывание сборок для ленивых потоков в форме правил перезаписи. Общая идея заключается в том, что если одна функция выдает результат, похожий на свёртку (она имеет тип (a -> b -> b) -> b -> b
применительно к (:)
а также []
), а другая функция потребляет список, который выглядит как сворачивание, построение промежуточного списка можно удалить.
Для вашей проблемы я собираюсь построить нечто подобное, но используя строгие левые сгибы (foldl'
вместо фолд. Вместо использования правил перезаписи, которые пытаются определить, когда что-то выглядит как foldl
Я буду использовать тип данных, который заставляет списки выглядеть как левые сгибы.
-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}
Поскольку я начал с того, что отказался от списков, мы заново реализуем часть прелюдии для списков.
Строгие левые складки могут быть созданы из foldl'
функции как списков, так и байтов.
{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)
{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)
Простейший пример его использования - найти длину списка.
{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0
Мы также можем отобразить и объединить левые сгибы.
{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)
{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)
Для вашей проблемы нам нужно разбить слово на части.
{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]
{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte
И ByteString
на кусочки
{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS
Чтобы найти самый длинный прогон, мы будем отслеживать предыдущее значение, длину текущего прогона и длину самого длинного прогона. Мы делаем поля строгими, чтобы строгость сгиба предотвращала накопление цепочек громких звёзд в памяти. Создание строгого типа данных для состояния - это простой способ получить контроль как над представлением в памяти, так и при оценке его полей.
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where
current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
where
(LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)
И мы сделали
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun $ bitStream' bs
Это намного быстрее, но не совсем производительность c.
longest-seq-c 0m00.12s (C)
longest-seq 0m08.65s (Haskell ByteString)
longest-seq-fuse 0m00.81s (Haskell ByteString fused)
Программа выделяет около 1 Мб для чтения 1000000 байт из ввода.
total alloc = 1,173,104 bytes (excludes profiling overheads)
Обновленный код GitHub
Я нашел другое решение наравне с С. Data.Vector.Fusion.Stream.Monadic
имеет потоковую реализацию, основанную на этой статье Coutts, Leshchinskiy, Stewart 2007. Идея, стоящая за этим, состоит в том, чтобы использовать слияние потоков destroy / unoldr.
Вспомните этот список :: (b -> Maybe (a, b)) -> b -> [a]
создает список путем многократного применения (разворачивания) функции шага вперед, начиная с начального значения. Stream
это просто unfoldr
функция с начальным состоянием. (The Data.Vector.Fusion.Stream.Monadic
библиотека использует GADT для создания конструкторов для Step
это может быть сопоставлено с шаблоном удобно. Думаю, это можно было бы сделать и без ГАДЦ.)
Центральным элементом решения является mkBitstream :: BSL.ByteString -> Stream Bool
функция, которая превращает BytesString
в поток Bool
, В основном, мы отслеживаем текущий ByteString
, текущий байт, и сколько текущего байта все еще не используется. Всякий раз, когда байт используется, другой байт отрубается ByteString
, когда Nothing
осталось, поток Done
,
longestRun
функция берется прямо из решения @Cirdec.
Вот этюд:
{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where
import Control.Monad.Identity (Identity)
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.Functor.Identity (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import Data.Word8 (Word8)
type Stream a = S.Stream Identity a -- no need for any monad, really
data Step = Step BSL.ByteString !Word8 !Word8 -- could use tuples, but this is faster
mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
{-# INLINE_INNER step #-}
step (Step bs w n) | n==0 = case (BSL.uncons bs) of
Nothing -> return S.Done
Just (w', bs') -> return $
S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
| otherwise = return $
S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
(LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
return longest
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun (mkBitstream bs)