Попытка создать асинхронный пример с использованием Core.Async.Pipe, но, как представляется, Pipe.write блокирует ожидание Pipe.read

Я создал следующий игрушечный пример, который считает в цикле и записывает значение в Async.Pipe:

open Sys
open Unix
open Async.Std
let (r,w) = Pipe.create ()

let rec readloop r = 
  Pipe.read r >>=
  function
  | `Eof -> return ()
  | `Ok v -> return (printf "Got %d\n" v) >>=
  fun () -> after (Core.Time.Span.of_sec 0.5) >>=
  fun () -> readloop r 

let countup hi w =
  let rec loop i = 
    printf "i=%d\n" i ;
    if (i < hi &&( not (Pipe.is_closed w))) then 
       Pipe.write w i >>>
       fun () -> loop (i+1)
     else Pipe.close w
  in 
  loop 0 

let () =
  countup 10 w;
  ignore(readloop r);;
  Core.Never_returns.never_returns (Scheduler.go ()) 

Обратите внимание, что функция readloop является рекурсивной - она ​​просто непрерывно считывает значения из канала, когда они доступны. Тем не менее, я добавил задержку 0,5 с между каждым чтением. Функция подсчета похожа, но она зацикливается и выполняет запись в тот же канал.

Когда я запускаю это, я получаю:

i=0
i=1
Got 0
i=2
Got 1
i=3
Got 2
i=4
Got 3
i=5
Got 4
i=6
Got 5
i=7
Got 6
i=8
Got 7
i=9
Got 8   
i=10
Got 9

Помимо первых трех строк вывода, представленных выше, все остальные строки вывода, похоже, должны ждать полсекунды. Таким образом, кажется, что канал заблокирован после записи, пока не будет прочитано из канала. (Данные Pipe.write w, кажется, блокируют ожидание Pipe.read r) То, что я думал, должно произойти (так как это какой-то асинхронный канал), так это то, что значения будут помещены в очередь в Pipe, пока не произойдет чтение, что-то лайк:

i=0
Got 0 (* now reader side waits for 1/2 second before reading again *) 
i=1   (* meanwhile writer side keeps running *)
i=2
i=3
i=4
i=5
i=6
i=7
i=8  
i=9 (* up till here, all output happens pretty much simultaneously *)
Got 1 (* 1/2 second between these messages *)
Got 2
Got 3
Got 4
Got 5
Got 6
Got 7
Got 8
Got 9

Мне интересно, есть ли способ получить поведение с помощью Async?

Мой реальный сценарий использования: у меня открыт сокет Tcp (как клиент), и если бы я использовал потоки после некоторой установки между клиентом и сервером, я бы запустил поток, который просто сидел и считывал данные, поступающие из сокета из сервер и поместить эти данные в очередь сообщений, которые можно будет просмотреть в главном потоке программы, когда он будет готов. Однако вместо того, чтобы использовать потоки, я хочу использовать Core.Async для достижения той же цели: считывать данные из сокета, когда он поступает с сервера, и когда данные доступны, проверять сообщение и делать что-то на основе его содержимого. Могут происходить и другие вещи, так что это моделируется "полсекунды ожидания" в приведенном выше коде. Я думал, что Pipe поставит сообщения в очередь, чтобы они могли быть прочитаны, когда читатель был готов, но это не так.

1 ответ

Решение

Действительно, труба - это очередь, но по умолчанию ее длина равна 0. Так что, когда вы отодвигаетесь, производитель немедленно остановится и будет ждать. Вы можете контролировать размер с set_size_budget функция.

Другие вопросы по тегам