Можно ли использовать IO.pipe для связи между потоками в Ruby?

В рубине IO.pipe пример из документации, одно сообщение передается через процессы.

Я хотел сделать что-то похожее, с двумя отличиями:

  1. использовать потоки вместо процессов
  2. использовать канал для постоянного обмена сообщениями, а не для разовых сообщений

Это очевидный, но нерабочий код:

rd, wr = IO.pipe

reader_thread = Thread.new(rd) do |rd|
  data_received = rd.read
  puts "Read: #{data_received.inspect}"
end

write_thread = Thread.new(wr) do |wr|
  wr.write "Message"
  wr.flush
end

write_thread.join
reader_thread.join

что приводит к reader_thread подождать rd.read,

Я мог бы сделать это с помощью IO#read_nonblock:

reader_thread = Thread.new(rd) do |rd|
  data_received = \
    begin
      rd.read_nonblock(100)
    rescue IO::WaitReadable, IO::EAGAINWaitReadable
      IO.select([rd])
      retry
    end

  puts "Read: #{data_received.inspect}"
end

Это правильный шаблон? Или использует IO.pipe не тот инструмент для обмена сообщениями?

2 ответа

Ваша читательская нить зависает, потому что без аргументов <tcode id="164829"></tcode>будет читать - и блокировать - пока не встретит EOF. (Если вы пройдете length, он будет читать, пока не прочитает такое количество байтов или EOF, в зависимости от того, что произойдет раньше, поэтому он все равно будет блокироваться, пока не получит хотя бы такой объем ввода.) Это подробно объясняется в <tcode id="164831"></tcode>документы .

Если вы позвоните wd.close перед reader_thread.join, получит этот EOF, и вы получите результат - все сразу, когда read разблокирует.

В реалистичном сценарии вы, вероятно, не просто захотите прочитать один раз, вы, вероятно, захотите зациклить, rdвстречает EOF, что-то делает с данными по пути. Самый простой способ - читать по одному байту за раз, используя read(1). (Я опускаю отдельный поток записи, чтобы упростить задачу - и вам тоже следует, если вам действительно не нужны три отдельных потока инструкций; часто вам понадобится поток фонового чтения или поток фонового записи, при этом основной поток обрабатывает другой конец - но поведение в основном такое же.

      text = <<~TEXT.strip
  Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
  sed do eiusmod tempor incididunt ut labore et dolore magna 
  aliqua.
TEXT

read_io, write_io = IO.pipe

reader_thread = Thread.new(read_io) do |io|
  puts('Reading:')
  while (c = io.read(1)) # block till we read one byte
    $stdout.write(c)
  end
  puts('...Done.')
end

# Write 50 chars/second, so we can see them get read one at a time
text.chars.each { |c| write_io.write(c); sleep(0.02) } 

reader_thread.join

# => Reading:
#    Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
#    sed do eiusmod tempor incididunt ut labore et dolore magna 
#    aliqua.

Однако это все еще висит, потому что IO.read(1) все еще ждет этого EOF, поэтому вам снова нужно закрыть write_io.

Кроме того, обычно не очень эффективно читать побайтно. На самом деле вам, вероятно, понадобится буфер 8 КБ или даже больше , в зависимости от вашего варианта использования.

      reader_thread = Thread.new(read_io) do |io|
  puts('Reading:')
  while (c = io.read(8192))
    $stdout.write(c)
  end
  puts('...Done.')
end

# We're writing 50 chars/second, but we won't see them print out
# till `read_io` has read 8192 bytes, or hit an EOF
text.chars.each { |c| write_io.write(c); sleep(0.02) }

write_io.close      # we have to close `write_io` *sometime* --
reader_thread.join  # -- or this will hang.

# => Reading:
#    Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
#    sed do eiusmod tempor incididunt ut labore et dolore magna 
#    aliqua....Done.

Вы также можете использовать Queue для безопасного обмена информацией между несколькими потоками:

q = Queue.new

reader_thread = Thread.new(q) do |q|
  data_received = q.pop
  puts "Read: #{data_received.inspect}"
end

write_thread = Thread.new(q) do |q|
  q.push "Message"
end

write_thread.join
reader_thread.join
Другие вопросы по тегам