Можно ли использовать IO.pipe для связи между потоками в Ruby?
В рубине IO.pipe
пример из документации, одно сообщение передается через процессы.
Я хотел сделать что-то похожее, с двумя отличиями:
- использовать потоки вместо процессов
- использовать канал для постоянного обмена сообщениями, а не для разовых сообщений
Это очевидный, но нерабочий код:
rd, wr = IO.pipe
reader_thread = Thread.new(rd) do |rd|
data_received = rd.read
puts "Read: #{data_received.inspect}"
end
write_thread = Thread.new(wr) do |wr|
wr.write "Message"
wr.flush
end
write_thread.join
reader_thread.join
что приводит к reader_thread
подождать rd.read
,
Я мог бы сделать это с помощью IO#read_nonblock
:
reader_thread = Thread.new(rd) do |rd|
data_received = \
begin
rd.read_nonblock(100)
rescue IO::WaitReadable, IO::EAGAINWaitReadable
IO.select([rd])
retry
end
puts "Read: #{data_received.inspect}"
end
Это правильный шаблон? Или использует IO.pipe
не тот инструмент для обмена сообщениями?
2 ответа
Ваша читательская нить зависает, потому что без аргументов
<tcode id="164829"></tcode>будет читать - и блокировать - пока не встретит EOF. (Если вы пройдете
length
, он будет читать, пока не прочитает такое количество байтов или EOF, в зависимости от того, что произойдет раньше, поэтому он все равно будет блокироваться, пока не получит хотя бы такой объем ввода.) Это подробно объясняется в
<tcode id="164831"></tcode>документы .
Если вы позвоните
wd.close
перед
reader_thread.join
, получит этот EOF, и вы получите результат - все сразу, когда
read
разблокирует.
В реалистичном сценарии вы, вероятно, не просто захотите прочитать один раз, вы, вероятно, захотите зациклить,
rd
встречает EOF, что-то делает с данными по пути. Самый простой способ - читать по одному байту за раз, используя
read(1)
. (Я опускаю отдельный поток записи, чтобы упростить задачу - и вам тоже следует, если вам действительно не нужны три отдельных потока инструкций; часто вам понадобится поток фонового чтения или поток фонового записи, при этом основной поток обрабатывает другой конец - но поведение в основном такое же.
text = <<~TEXT.strip
Lorem ipsum dolor sit amet, consectetur adipiscing elit,
sed do eiusmod tempor incididunt ut labore et dolore magna
aliqua.
TEXT
read_io, write_io = IO.pipe
reader_thread = Thread.new(read_io) do |io|
puts('Reading:')
while (c = io.read(1)) # block till we read one byte
$stdout.write(c)
end
puts('...Done.')
end
# Write 50 chars/second, so we can see them get read one at a time
text.chars.each { |c| write_io.write(c); sleep(0.02) }
reader_thread.join
# => Reading:
# Lorem ipsum dolor sit amet, consectetur adipiscing elit,
# sed do eiusmod tempor incididunt ut labore et dolore magna
# aliqua.
Однако это все еще висит, потому что
IO.read(1)
все еще ждет этого EOF, поэтому вам снова нужно закрыть
write_io
.
Кроме того, обычно не очень эффективно читать побайтно. На самом деле вам, вероятно, понадобится буфер 8 КБ или даже больше , в зависимости от вашего варианта использования.
reader_thread = Thread.new(read_io) do |io|
puts('Reading:')
while (c = io.read(8192))
$stdout.write(c)
end
puts('...Done.')
end
# We're writing 50 chars/second, but we won't see them print out
# till `read_io` has read 8192 bytes, or hit an EOF
text.chars.each { |c| write_io.write(c); sleep(0.02) }
write_io.close # we have to close `write_io` *sometime* --
reader_thread.join # -- or this will hang.
# => Reading:
# Lorem ipsum dolor sit amet, consectetur adipiscing elit,
# sed do eiusmod tempor incididunt ut labore et dolore magna
# aliqua....Done.
Вы также можете использовать Queue
для безопасного обмена информацией между несколькими потоками:
q = Queue.new
reader_thread = Thread.new(q) do |q|
data_received = q.pop
puts "Read: #{data_received.inspect}"
end
write_thread = Thread.new(q) do |q|
q.push "Message"
end
write_thread.join
reader_thread.join