Как совместить фильтры Lwt?
В настоящее время я учу Lwt. Я заинтересован в использовании асинхронных процессов для замены некоторых подпрограмм оболочки на подпрограммы OCaml.
Давайте посмотрим на упрощенную первую попытку, где фильтр создается путем объединения двух работающих потоков. cat
:
let filter_cat ()=
Lwt_process.pmap_lines ("cat", [| "cat" |])
let filter_t () =
Lwt_io.stdin
|> Lwt_io.read_lines
|> filter_cat ()
|> filter_cat ()
|> Lwt_io.write_lines Lwt_io.stdout
let () =
filter_t ()
|> Lwt_main.run
Этот фильтр как-то работает, но зависает, когда его стандартный ввод закрывается вместо выхода. Если я удалю один из filter_cat
работает как положено.
Я предполагаю, что я не составляю эти фильтры должным образом и поэтому не могу присоединиться к двум потокам, которые я запускаю. Как правильно составить эти фильтры, чтобы программа завершала работу после чтения EOF
на stdin
?
Вы можете найти эту программу вместе с Makefile для BSD Owl в Github.
1 ответ
Ответ заключается в том, что в Lwt есть небольшая ошибка. Есть внутренняя функция, контролирующая ту, которая выполняет трубопровод:
(* Monitor the thread [sender] in the stream [st] so write errors are
reported. *)
let monitor sender st =
let sender = sender >|= fun () -> None in
let state = ref Init in
Lwt_stream.from
(fun () ->
match !state with
| Init ->
let getter = Lwt.apply Lwt_stream.get st in
let result _ =
match Lwt.state sender with
| Lwt.Sleep ->
(* The sender is still sleeping, behave as the
getter. *)
getter
| Lwt.Return _ ->
(* The sender terminated successfully, we are
done monitoring it. *)
state := Done;
getter
| Lwt.Fail _ ->
(* The sender failed, behave as the sender for
this element and save current getter. *)
state := Save getter;
sender
in
Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
| Save t ->
state := Done;
t
| Done ->
Lwt_stream.get st)
Проблема в определении
let getter = Lwt.apply Lwt_stream.get st
Когда getter
процесс встречает конец потока, затем он сохраняется, но sender
теряется, что, кажется, мешает завершению. Это можно исправить, улучшив определение getter
сказав ему вести себя как sender
когда конец потока был достигнут.