Haskell быстрая одновременная очередь

Эта проблема

Здравствуйте! Я пишу библиотеку журналов, и я хотел бы создать регистратор, который будет работать в отдельном потоке, в то время как все потоки приложений будут просто отправлять ему сообщения. Я хочу найти наиболее эффективное решение этой проблемы. Мне нужна простая unboud очередь здесь.

подходы

Я создал несколько тестов, чтобы увидеть, как работают доступные решения, и здесь я получаю очень странные результаты. Я протестировал 4 реализации (исходный код предоставлен ниже) на основе:

  1. Трубы-параллелизм
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. MVar основан, как описано в книге "Параллельное и параллельное программирование в Haskell". Обратите внимание, что этот метод дает нам ограниченные очереди емкостью 1 - он используется только для тестов.

тесты

Вот исходный код, использованный для тестирования:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

Вы можете скомпилировать его с ghc -O2 Main.hs и просто запустите его. Тесты создают 20 производителей сообщений, каждый из которых производит 1000000 сообщений.

Результаты

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

Вопрос

Я хотел бы спросить вас, почему версия с параллельной передачей данных работает так медленно и почему она намного медленнее, чем даже основанная на Чане. Я очень удивлен, что MVar один является самым быстрым из всех версий - кто-нибудь может сказать больше, почему мы получаем такие результаты и можем ли мы добиться большего успеха в любом случае?

2 ответа

Решение

Поэтому я могу дать вам небольшой обзор некоторых из анализа Chan а также TQueue (который pipes-concurrency использует внутренне здесь), который мотивировал некоторые дизайнерские решения, которые вошли в unagi-chan, Я не уверен, что ответит на ваш вопрос. Я рекомендую разветвляться в разных очередях и играть с вариациями во время бенчмаркинга, чтобы по-настоящему понять происходящее.

Chan

Chan похоже:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

Это связанный список MVar s. Два MVar в Chan type действует как указатель на текущий заголовок и конец списка соответственно. Вот как выглядит запись:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

На 1 писатель берет блокировку на конце записи, на 2 наш предмет a становится доступным для читателя, а в 3 конец записи разблокируется для других авторов.

Это на самом деле работает довольно хорошо в сценарии с одним потребителем / одним производителем (см. График здесь), потому что чтение и запись не конкурируют. Но если у вас есть несколько одновременно работающих писателей, у вас могут возникнуть проблемы:

  • писатель, который нажимает 1, в то время как другой писатель находится на уровне 2, блокируется и будет отключен (самое быстрое, что я смог измерить, - 150 нс (довольно чертовски быстро); возможно, есть ситуации, когда он намного медленнее). Поэтому, когда многие писатели утверждают, что вы в основном совершаете большое круговое путешествие через планировщик, в очередь ожидания для MVar и, наконец, запись может завершиться.

  • Когда записывающее устройство отключается по расписанию (из-за превышения времени ожидания) в то время, как оно равно 2, оно удерживает блокировку, и никакие записи не будут допущены до тех пор, пока они не могут быть перепланированы снова; это становится более серьезной проблемой, когда мы переподписаны, то есть когда соотношение потоков / ядра высокое.

Наконец, используя MVar -per-item требует некоторых накладных расходов с точки зрения распределения, и что более важно, когда мы накапливаем много изменяемых объектов, мы можем вызвать большое давление GC.

TQUEUE

TQueue это здорово, потому что STM делает это очень просто рассуждать о его правильности. Это функциональная очередь в стиле dequeue, и write состоит из простого чтения стека записи, обработки нашего элемента и его обратной записи:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

Если после writeTQueue записывает свой новый стек обратно, другая перемеженная запись делает то же самое, одна из записей будет повторена. Как более writeTQueue s чередуются, эффект раздора ухудшается. Однако производительность ухудшается гораздо медленнее, чем в Chan потому что есть только один writeTVar операция, которая может аннулировать конкуренцию writeTQueueс, и транзакция очень мала (просто чтение и (:)).

Чтение работает путем "вытеснения" стека со стороны записи, обращения к нему и сохранения обращенного стека в его собственной переменной для легкого "выталкивания" (в целом это дает нам амортизированные O(1) push и pop)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

Читатели имеют симметричный умеренный спор с писателями. В общем случае читатели и писатели не борются, однако, когда стек читателей исчерпан, читатели соревнуются с другими читателями и писателями. Я подозреваю, если вы предварительно загрузили TQueue Имея достаточное количество значений, а затем запустив 4 считывателя и 4 записывающих устройства, вы можете вызвать активную блокировку, так как обратная сторона пытается завершиться до следующей записи. Также интересно отметить, что в отличие от MVar написать в TVar то, что многие читатели ждут, пробуждает их всех одновременно (это может быть более или менее эффективным, в зависимости от сценария).

Я подозреваю, что вы не видите много слабых мест TQueue в вашем тесте; в первую очередь вы наблюдаете умеренные последствия конфликтов при записи и накладных расходов, связанных с выделением и сборкой большого количества изменяемых объектов.

унаги-чан

unagi-chan был спроектирован во-первых, чтобы хорошо справляться с раздорами. Это концептуально очень просто, но реализация имеет некоторые сложности

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

Чтение и запись сторон очереди разделяют Stream на котором они координируют передаваемые значения (от писателя к читателю) и указания на блокировку (от читателя к писателю), а также стороны чтения и записи имеют независимый атомный счетчик. Запись работает как:

  1. писатель называет атомное incrCounter на счетчике записи, чтобы получить его уникальный индекс, чтобы координировать его с (одиночным) читателем

  2. писатель находит свою камеру и выполняет CAS Written a

  3. в случае успеха он выходит, иначе он видит, что читатель победил его и блокирует (или продолжает блокировать), поэтому он делает (\Blocking v)-> putMVar v a) и выходит.

Чтение работает аналогичным и очевидным образом.

Первое нововведение заключается в том, чтобы сделать точку разногласия атомарной операцией, которая не ухудшается в условиях разногласий (как это делает цикл CAS/ повтор или блокировка типа Chan). Основанный на простом бенчмаркинге и экспериментах, извлечение и добавление примопа, atomic-primops Библиотека работает лучше всего.

Затем в 2 и читатель, и писатель должны выполнить только одно сравнение и обмен (быстрый путь для читателя - простое неатомарное чтение), чтобы завершить координацию.

Итак, чтобы попытаться сделать unagi-chan хорошо мы

  • используйте fetch-and-add для обработки спора

  • использовать методы без блокировки, чтобы при переподписке потока, запланированного в неподходящее время, поток не блокировался для других потоков (заблокированный модуль записи может блокировать не более того "читателя", "назначенного" ему счетчиком; чтение предостерегает относительно асинхронных исключений) в unagi-chan документы, и обратите внимание, что Chan здесь более приятная семантика)

  • использовать массив для хранения наших элементов, который имеет лучшую локальность (но см. ниже), снижает накладные расходы на элемент и оказывает меньшее давление на GC

Последнее замечание использование массива: параллельная запись в массив, как правило, плохая идея для масштабирования, потому что вы вызываете большой трафик когерентности кэша, так как линии кэширования недействительны по всем потокам записи. Общий термин "ложный обмен". Но есть и минусы кеша и минусы альтернативных конструкций, о которых я могу подумать, что это будет чередование записей или что-то в этом роде; Я немного экспериментировал с этим, но в данный момент у меня нет ничего убедительного.

Одно место, где мы законно занимаемся ложным разделением, - это наш счетчик, который мы выравниваем и дополняем до 64 байтов; это действительно проявилось в тестах, и единственным недостатком является увеличение использования памяти.

Если бы я должен был догадаться, почему pipes-concurrency выполнять хуже, это потому, что каждое чтение и запись обернуты в STM транзакции, тогда как другие библиотеки используют более эффективные низкоуровневые примитивы параллелизма.

Другие вопросы по тегам