Почему этот основанный на Lwt и, казалось бы, параллельный код настолько несовместим
Я пытаюсь создать параллельные примеры Lwt и придумал этот маленький образец
let () =
Lwt_main.run (
let start = Unix.time () in
Lwt_io.open_file Lwt_io.Input "/dev/urandom" >>= fun data_source ->
Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
Lwt_list.iter_p
(fun count ->
let count = string_of_int count in
Lwt_io.open_file
~flags:[Unix.O_RDWR; Unix.O_CREAT]
~perm:0o777
~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= fun h ->
Lwt_io.read ~count:52428800
data_source >>= Lwt_io.write_line h)
[0;1;2;3;4;5;6;7;8;9] >>= fun () ->
let finished = Unix.time () in
Lwt_io.printlf "Execution time took %f seconds" (finished -. start))
РЕДАКТИРОВАТЬ: с просьбой о 50 ГБ это было: "Однако это невероятно медленно и в основном бесполезно. Нужно ли каким-то образом принудительно устанавливать внутреннюю привязку?
РЕДАКТИРОВАТЬ: Первоначально я написал с просьбой о 50 ГБ, и это никогда не заканчивалось, теперь у меня другая проблема с запросом о 50 МБ, Выполнение почти мгновенно, и du -sh сообщает только о размере каталога 80 КБ.
РЕДАКТИРОВАТЬ: я также пробовал код с явным закрытием файловых дескрипторов с тем же плохим результатом.
Я на OS X
последнюю версию и скомпилируйте с
ocamlfind ocamlopt -package lwt.unix main.ml -linkpkg -o Test
(Я тоже пробовал /dev/random
Да, я использую часы на стене.)
2 ответа
Итак, у вашего кода есть некоторые проблемы.
Выпуск 1
Основная проблема в том, что вы поняли Lwt_io.read
функционировать неправильно (и никто не может обвинить вас!).
val read : ?count : int -> input_channel -> string Lwt.t
(** [read ?count ic] reads at most [len] characters from [ic]. It
returns [""] if the end of input is reached. If [count] is not
specified, it reads all bytes until the end of input. *)
когда ~count:len
указан он будет читать не более len
персонажи. Максимум, значит, что он может читать меньше. Но если count
опция опущена, тогда она будет читать все данные. Лично я считаю это поведение неинтуитивным, если не странным. Таким образом, это самое большее означает до len
или меньше, т. е. нет никакой гарантии, что он будет читать точно len
байт. И действительно, если вы добавите проверку в вашу программу:
Lwt_io.read ~count:52428800 data_source >>= fun data ->
Lwt_io.printlf "Read %d bytes" (String.length data) >>= fun () ->
Lwt_io.write h data >>= fun () ->
Вы увидите, что это будет только для чтения 4096
байт за попытку:
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Зачем 4096
? Потому что это размер буфера по умолчанию. Но это на самом деле не имеет значения.
Выпуск 2
Lwt_io
Модуль реализует буферизованный ввод-вывод. Это означает, что все ваши записи и чтения не идут непосредственно в файл, а помещаются в буфер. Это означает, что вы должны помнить flush
а также close
, Ваш код не закрывает дескрипторы по окончании, поэтому вы можете столкнуться с ситуацией, когда некоторые буферы остаются незаполненными после завершения программы. Lwt_io
в частности, очищает все буферы перед выходом из программы. Но вы не должны полагаться на эту недокументированную функцию (она может ударить вас в будущем, когда вы попробуете другие буферизованные операции ввода-вывода, такие как fstream из стандартной библиотеки C). Поэтому всегда закрывайте свои файлы (другая проблема заключается в том, что на сегодняшний день файловые дескрипторы являются наиболее ценным ресурсом, и их утечка очень трудно найти).
Выпуск 3
Не использовать /dev/urandom
или же /dev/random
измерить IO. Для первого вы будете измерять производительность генератора случайных чисел, для второго вы будете измерять поток энтропии в вашей машине. Оба довольно медленные. В зависимости от скорости вашего процессора вы редко получаете более 16 Мбит / с, а это намного меньше, чем Lwt
может пропускная способность. Чтение из /dev/zero
и писать в /dev/null
фактически выполнит все передачи в памяти и покажет фактическую скорость, которая может быть достигнута вашей программой. Хорошо написанная программа будет по-прежнему ограничена скоростью ядра. В приведенном ниже примере программы средняя скорость будет равна 700 МБ / с.
Выпуск 4
Не используйте буферизованный ввод, если вы действительно стремитесь к производительности. Вы никогда не получите максимум. Например, Lwt_io.read
сначала будет читать в буфере, затем он создаст string
и скопировать данные в эту строку. Если вам действительно нужна производительность, тогда вам нужно обеспечить собственную буферизацию. В большинстве случаев в этом нет необходимости, так как Lwt_io
довольно производительный. Но если вам нужно обрабатывать десятки мегабайт в секунду или вам нужна какая-то особая политика буферизации (что-то нелинейное), вам, возможно, придется подумать о предоставлении собственной буферизации. Хорошая новость в том, что Lwt_io
позволяет вам сделать это. Вы можете взглянуть на пример программы, которая будет измерять производительность Lwt
ввод, вывод. Это имитирует хорошо известный pv
программа.
Выпуск 5
Вы ожидаете получить некоторую производительность при параллельном запуске потоков. Проблема в том, что в вашем тесте нет места для параллелизма. /dev/random
(так же как /dev/zero
) - это одно устройство, которое ограничено только процессором. Это то же самое, что просто позвонить random
функция. Это всегда будет available
так что никакой системный вызов не будет блокироваться на нем. Запись в обычный файл также не является хорошим местом для параллелизма. Прежде всего, обычно это только один жесткий диск с одной пишущей головкой. Даже если системный вызов заблокирует и выдаст элемент управления другому потоку, это приведет к снижению производительности, поскольку теперь два потока будут бороться за позицию заголовка. Если у вас SSD, не будет никакой конкуренции за заголовок, но производительность будет еще хуже, так как вы испортите свой кеш. Но, к счастью, обычно запись на обычных файлах не блокируется. Таким образом, ваши потоки будут работать последовательно, то есть они будут сериализованы.
Если вы посмотрите на свои файлы, то увидите, что каждый из них составляет 4097 КБ - это 4096 КБ, которые были прочитаны из /dev/urandom, плюс один байт для новой строки. Вы достигаете максимума буфера с Lwt_io.read, поэтому, даже если вы скажете ~count:awholelot, он даст вам только ~count:4096.
Я не знаю, каков канонический способ сделать это, но вот одна альтернатива:
open Lwt
let stream_a_little source n =
let left = ref n in
Lwt_stream.from (fun () ->
if !left <= 0 then return None
else Lwt_io.read ~count:!left source >>= (fun s ->
left:=!left - (Bytes.length s);
return (Some s)
))
let main () =
Lwt_io.open_file ~buffer_size:(4096*8) ~mode:Lwt_io.Input "/dev/urandom" >>= fun data_source ->
Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
Lwt_list.iter_p
(fun count ->
let count = string_of_int count in
Lwt_io.open_file
~flags:[Unix.O_RDWR; Unix.O_CREAT]
~perm:0o777
~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= (fun h ->
Lwt_stream.iter_s (Lwt_io.write h)
(stream_a_little data_source 52428800)))
[0;1;2;3;4;5;6;7;8;9]
let timeit f =
let start = Unix.time () in
f () >>= fun () ->
let finished = Unix.time () in
Lwt_io.printlf "Execution time took %f seconds" (finished -. start)
let () =
Lwt_main.run (timeit main)
РЕДАКТИРОВАТЬ: Обратите внимание, что lwt является кооперативной библиотекой потоков; когда два потока идут "одновременно", они на самом деле не делают ничего в вашем процессе OCaml одновременно. OCaml (пока что) одноядерный, поэтому, когда один поток движется, другие приятно ждут, пока этот поток не скажет: "Хорошо, я проделал некоторую работу, вы, другие, уходите". Поэтому при попытке одновременной потоковой передачи до 8 файлов вы в основном выдаваете небольшую случайность в файл1, затем немного в файл2,... немного в файл8, а затем (если еще есть над чем работать) немного в файл1, затем немного в файл2 и т. д. Это имеет смысл, если вы все равно ожидаете много ввода (скажем, ваш ввод поступает по сети), и у вашего основного процесса есть много времени, чтобы просмотреть каждый поток и проверить "есть ли какие-либо входные данные?", но когда все ваши потоки просто читают из /dev/random, гораздо быстрее будет просто сначала заполнить один файл, затем второй и т. д. И при условии, что несколько процессоров смогут чтение /dev/(u) в параллельном режиме (и ваш накопитель может работать), конечно, было бы намного быстрее одновременно загружать чтение ncpu, но тогда вам понадобится многоядерный (или просто сделать это в сценарии оболочки),
EDIT2: показал, как увеличить размер буфера на считывателе, немного увеличил скорость;) Обратите внимание, что вы также можете просто установить buffer_size настолько высоким, насколько вы хотите на своем старом примере, который будет читать все сразу, но вы можете получит больше, чем ваш buffer_size, если вы не прочитали несколько раз.