Поток и очередь
Мне интересно знать, что было бы лучшим способом реализовать очередь на основе потоков.
Например:
У меня есть 10 действий, которые я хочу выполнить только с 4 потоками. Я хотел бы создать очередь со всеми 10-ю действиями, расположенными линейно, и начать первые 4 действия с 4-мя потоками, как только один из потоков будет выполнен, следующий запустится и т. Д. - Таким образом, за раз количество потоков либо 4, либо меньше 4.
6 ответов
E сть Queue
класс в thread
в стандартной библиотеке. Используя это, вы можете сделать что-то вроде этого:
require 'thread'
queue = Queue.new
threads = []
# add work to the queue
queue << work_unit
4.times do
threads << Thread.new do
# loop until there are no more things to do
until queue.empty?
# pop with the non-blocking flag set, this raises
# an exception if the queue is empty, in which case
# work_unit will be set to nil
work_unit = queue.pop(true) rescue nil
if work_unit
# do work
end
end
# when there is no more work, the thread will stop
end
end
# wait until all threads have completed processing
threads.each { |t| t.join }
Причина, по которой я показываю неблокирующий флаг, заключается в том, что между until queue.empty?
и всплывающий другой поток мог вытолкнуть очередь, поэтому, если не установлен неблокирующий флаг, мы можем застрять в этой строке навсегда.
Если вы используете MRI, интерпретатор Ruby по умолчанию, имейте в виду, что потоки не будут абсолютно параллельными. Если ваша работа связана с процессором, вы можете запустить однопоточный. Если у вас есть какая-то операция, которая блокирует IO, вы можете получить некоторый параллелизм, но YMMV. Кроме того, вы можете использовать интерпретатор, который обеспечивает полный параллелизм, такой как jRuby или Rubinius.
Есть несколько драгоценных камней, которые реализуют этот шаблон для вас; параллель, персик, а мой называется threach
(или же jruby_threach
под юроби). Это заменяющая замена для #each, но она позволяет вам указать, сколько потоков следует запускать, используя SizedQueue, чтобы не допустить выхода из-под контроля.
Так...
(1..10).threach(4) {|i| do_my_work(i) }
Не выдвигать мои собственные вещи; Есть много хороших реализаций, чтобы сделать вещи проще.
Если вы используете JRuby, jruby_threach
это намного лучшая реализация - Java просто предлагает гораздо более богатый набор потоковых приматов и структур данных для использования.
Исполняемый описательный пример:
require 'thread'
p tasks = [
{:file => 'task1'},
{:file => 'task2'},
{:file => 'task3'},
{:file => 'task4'},
{:file => 'task5'}
]
tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}
# run workers
workers_count = 3
workers = []
workers_count.times do |n|
workers << Thread.new(n+1) do |my_n|
while (task = tasks_queue.shift(true) rescue nil) do
delay = rand(0)
sleep delay
task[:result] = "done by worker ##{my_n} (in #{delay})"
p task
end
end
end
# wait for all threads
workers.each(&:join)
# output results
puts "all done"
p tasks
Вы можете использовать пул потоков. Это довольно распространенная модель для этого типа проблемы.
http://en.wikipedia.org/wiki/Thread_pool_pattern
Github, кажется, имеет несколько реализаций, которые вы можете попробовать:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool
Я использую драгоценный камень под названием work_queue. Это действительно практично.
Пример:
require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
wq.enqueue_b("Thread#{number}") do |thread_name|
puts "Hello from the #{thread_name}"
end
end
wq.join