Как совместить фильтры Lwt?

В настоящее время я учу Lwt. Я заинтересован в использовании асинхронных процессов для замены некоторых подпрограмм оболочки на подпрограммы OCaml.

Давайте посмотрим на упрощенную первую попытку, где фильтр создается путем объединения двух работающих потоков. cat:

let filter_cat ()=
  Lwt_process.pmap_lines ("cat", [| "cat" |])

let filter_t () =
  Lwt_io.stdin
  |> Lwt_io.read_lines
  |> filter_cat ()
  |> filter_cat ()
  |> Lwt_io.write_lines Lwt_io.stdout

let () =
  filter_t ()
  |> Lwt_main.run

Этот фильтр как-то работает, но зависает, когда его стандартный ввод закрывается вместо выхода. Если я удалю один из filter_catработает как положено.

Я предполагаю, что я не составляю эти фильтры должным образом и поэтому не могу присоединиться к двум потокам, которые я запускаю. Как правильно составить эти фильтры, чтобы программа завершала работу после чтения EOF на stdin?


Вы можете найти эту программу вместе с Makefile для BSD Owl в Github.

1 ответ

Решение

Ответ заключается в том, что в Lwt есть небольшая ошибка. Есть внутренняя функция, контролирующая ту, которая выполняет трубопровод:

(* Monitor the thread [sender] in the stream [st] so write errors are
   reported. *)
let monitor sender st =
  let sender = sender >|= fun () -> None in
  let state = ref Init in
  Lwt_stream.from
    (fun () ->
       match !state with
         | Init ->
             let getter = Lwt.apply Lwt_stream.get st in
             let result _ =
               match Lwt.state sender with
                 | Lwt.Sleep ->
                     (* The sender is still sleeping, behave as the
                        getter. *)
                     getter
                 | Lwt.Return _ ->
                     (* The sender terminated successfully, we are
                        done monitoring it. *)
                     state := Done;
                     getter
                 | Lwt.Fail _ ->
                     (* The sender failed, behave as the sender for
                        this element and save current getter. *)
                     state := Save getter;
                     sender
             in
             Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
         | Save t ->
             state := Done;
             t
         | Done ->
             Lwt_stream.get st)

Проблема в определении

let getter = Lwt.apply Lwt_stream.get st

Когда getter процесс встречает конец потока, затем он сохраняется, но sender теряется, что, кажется, мешает завершению. Это можно исправить, улучшив определение getter сказав ему вести себя как sender когда конец потока был достигнут.

Другие вопросы по тегам