Haskell быстрая одновременная очередь
Эта проблема
Здравствуйте! Я пишу библиотеку журналов, и я хотел бы создать регистратор, который будет работать в отдельном потоке, в то время как все потоки приложений будут просто отправлять ему сообщения. Я хочу найти наиболее эффективное решение этой проблемы. Мне нужна простая unboud очередь здесь.
подходы
Я создал несколько тестов, чтобы увидеть, как работают доступные решения, и здесь я получаю очень странные результаты. Я протестировал 4 реализации (исходный код предоставлен ниже) на основе:
- Трубы-параллелизм
- Control.Concurrent.Chan
- Control.Concurrent.Chan.Unagi
- MVar основан, как описано в книге "Параллельное и параллельное программирование в Haskell". Обратите внимание, что этот метод дает нам ограниченные очереди емкостью 1 - он используется только для тестов.
тесты
Вот исходный код, использованный для тестирования:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
Вы можете скомпилировать его с ghc -O2 Main.hs
и просто запустите его. Тесты создают 20 производителей сообщений, каждый из которых производит 1000000 сообщений.
Результаты
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
Вопрос
Я хотел бы спросить вас, почему версия с параллельной передачей данных работает так медленно и почему она намного медленнее, чем даже основанная на Чане. Я очень удивлен, что MVar один является самым быстрым из всех версий - кто-нибудь может сказать больше, почему мы получаем такие результаты и можем ли мы добиться большего успеха в любом случае?
2 ответа
Поэтому я могу дать вам небольшой обзор некоторых из анализа Chan
а также TQueue
(который pipes-concurrency
использует внутренне здесь), который мотивировал некоторые дизайнерские решения, которые вошли в unagi-chan
, Я не уверен, что ответит на ваш вопрос. Я рекомендую разветвляться в разных очередях и играть с вариациями во время бенчмаркинга, чтобы по-настоящему понять происходящее.
Chan
Chan
похоже:
data Chan a
= Chan (MVar (Stream a)) -- pointer to "head", where we read from
(MVar (Stream a)) -- pointer to "tail", where values written to
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
Это связанный список MVar
s. Два MVar
в Chan
type действует как указатель на текущий заголовок и конец списка соответственно. Вот как выглядит запись:
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
new_hole <- newEmptyMVar mask_ $ do
old_hole <- takeMVar writeVar -- [1]
putMVar old_hole (ChItem val new_hole) -- [2]
putMVar writeVar new_hole -- [3]
На 1 писатель берет блокировку на конце записи, на 2 наш предмет a
становится доступным для читателя, а в 3 конец записи разблокируется для других авторов.
Это на самом деле работает довольно хорошо в сценарии с одним потребителем / одним производителем (см. График здесь), потому что чтение и запись не конкурируют. Но если у вас есть несколько одновременно работающих писателей, у вас могут возникнуть проблемы:
писатель, который нажимает 1, в то время как другой писатель находится на уровне 2, блокируется и будет отключен (самое быстрое, что я смог измерить, - 150 нс (довольно чертовски быстро); возможно, есть ситуации, когда он намного медленнее). Поэтому, когда многие писатели утверждают, что вы в основном совершаете большое круговое путешествие через планировщик, в очередь ожидания для
MVar
и, наконец, запись может завершиться.Когда записывающее устройство отключается по расписанию (из-за превышения времени ожидания) в то время, как оно равно 2, оно удерживает блокировку, и никакие записи не будут допущены до тех пор, пока они не могут быть перепланированы снова; это становится более серьезной проблемой, когда мы переподписаны, то есть когда соотношение потоков / ядра высокое.
Наконец, используя MVar
-per-item требует некоторых накладных расходов с точки зрения распределения, и что более важно, когда мы накапливаем много изменяемых объектов, мы можем вызвать большое давление GC.
TQUEUE
TQueue
это здорово, потому что STM
делает это очень просто рассуждать о его правильности. Это функциональная очередь в стиле dequeue, и write
состоит из простого чтения стека записи, обработки нашего элемента и его обратной записи:
data TQueue a = TQueue (TVar [a])
(TVar [a])
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do
listend <- readTVar write -- a transaction with a consistent
writeTVar write (a:listend) -- view of memory
Если после writeTQueue
записывает свой новый стек обратно, другая перемеженная запись делает то же самое, одна из записей будет повторена. Как более writeTQueue
s чередуются, эффект раздора ухудшается. Однако производительность ухудшается гораздо медленнее, чем в Chan
потому что есть только один writeTVar
операция, которая может аннулировать конкуренцию writeTQueue
с, и транзакция очень мала (просто чтение и (:)
).
Чтение работает путем "вытеснения" стека со стороны записи, обращения к нему и сохранения обращенного стека в его собственной переменной для легкого "выталкивания" (в целом это дает нам амортизированные O(1) push и pop)
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
Читатели имеют симметричный умеренный спор с писателями. В общем случае читатели и писатели не борются, однако, когда стек читателей исчерпан, читатели соревнуются с другими читателями и писателями. Я подозреваю, если вы предварительно загрузили TQueue
Имея достаточное количество значений, а затем запустив 4 считывателя и 4 записывающих устройства, вы можете вызвать активную блокировку, так как обратная сторона пытается завершиться до следующей записи. Также интересно отметить, что в отличие от MVar
написать в TVar
то, что многие читатели ждут, пробуждает их всех одновременно (это может быть более или менее эффективным, в зависимости от сценария).
Я подозреваю, что вы не видите много слабых мест TQueue
в вашем тесте; в первую очередь вы наблюдаете умеренные последствия конфликтов при записи и накладных расходов, связанных с выделением и сборкой большого количества изменяемых объектов.
унаги-чан
unagi-chan
был спроектирован во-первых, чтобы хорошо справляться с раздорами. Это концептуально очень просто, но реализация имеет некоторые сложности
data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
data Cell a = Empty | Written a | Blocking (MVar a)
Чтение и запись сторон очереди разделяют Stream
на котором они координируют передаваемые значения (от писателя к читателю) и указания на блокировку (от читателя к писателю), а также стороны чтения и записи имеют независимый атомный счетчик. Запись работает как:
писатель называет атомное
incrCounter
на счетчике записи, чтобы получить его уникальный индекс, чтобы координировать его с (одиночным) читателемписатель находит свою камеру и выполняет CAS
Written a
в случае успеха он выходит, иначе он видит, что читатель победил его и блокирует (или продолжает блокировать), поэтому он делает
(\Blocking v)-> putMVar v a)
и выходит.
Чтение работает аналогичным и очевидным образом.
Первое нововведение заключается в том, чтобы сделать точку разногласия атомарной операцией, которая не ухудшается в условиях разногласий (как это делает цикл CAS/ повтор или блокировка типа Chan). Основанный на простом бенчмаркинге и экспериментах, извлечение и добавление примопа, atomic-primops
Библиотека работает лучше всего.
Затем в 2 и читатель, и писатель должны выполнить только одно сравнение и обмен (быстрый путь для читателя - простое неатомарное чтение), чтобы завершить координацию.
Итак, чтобы попытаться сделать unagi-chan
хорошо мы
используйте fetch-and-add для обработки спора
использовать методы без блокировки, чтобы при переподписке потока, запланированного в неподходящее время, поток не блокировался для других потоков (заблокированный модуль записи может блокировать не более того "читателя", "назначенного" ему счетчиком; чтение предостерегает относительно асинхронных исключений) в
unagi-chan
документы, и обратите внимание, чтоChan
здесь более приятная семантика)использовать массив для хранения наших элементов, который имеет лучшую локальность (но см. ниже), снижает накладные расходы на элемент и оказывает меньшее давление на GC
Последнее замечание использование массива: параллельная запись в массив, как правило, плохая идея для масштабирования, потому что вы вызываете большой трафик когерентности кэша, так как линии кэширования недействительны по всем потокам записи. Общий термин "ложный обмен". Но есть и минусы кеша и минусы альтернативных конструкций, о которых я могу подумать, что это будет чередование записей или что-то в этом роде; Я немного экспериментировал с этим, но в данный момент у меня нет ничего убедительного.
Одно место, где мы законно занимаемся ложным разделением, - это наш счетчик, который мы выравниваем и дополняем до 64 байтов; это действительно проявилось в тестах, и единственным недостатком является увеличение использования памяти.
Если бы я должен был догадаться, почему pipes-concurrency
выполнять хуже, это потому, что каждое чтение и запись обернуты в STM
транзакции, тогда как другие библиотеки используют более эффективные низкоуровневые примитивы параллелизма.