Поток и очередь

Мне интересно знать, что было бы лучшим способом реализовать очередь на основе потоков.

Например:

У меня есть 10 действий, которые я хочу выполнить только с 4 потоками. Я хотел бы создать очередь со всеми 10-ю действиями, расположенными линейно, и начать первые 4 действия с 4-мя потоками, как только один из потоков будет выполнен, следующий запустится и т. Д. - Таким образом, за раз количество потоков либо 4, либо меньше 4.

6 ответов

E сть Queue класс в thread в стандартной библиотеке. Используя это, вы можете сделать что-то вроде этого:

require 'thread'

queue = Queue.new
threads = []

# add work to the queue
queue << work_unit

4.times do
  threads << Thread.new do
    # loop until there are no more things to do
    until queue.empty?
      # pop with the non-blocking flag set, this raises
      # an exception if the queue is empty, in which case
      # work_unit will be set to nil
      work_unit = queue.pop(true) rescue nil
      if work_unit
        # do work
      end
    end
    # when there is no more work, the thread will stop
  end
end

# wait until all threads have completed processing
threads.each { |t| t.join }

Причина, по которой я показываю неблокирующий флаг, заключается в том, что между until queue.empty? и всплывающий другой поток мог вытолкнуть очередь, поэтому, если не установлен неблокирующий флаг, мы можем застрять в этой строке навсегда.

Если вы используете MRI, интерпретатор Ruby по умолчанию, имейте в виду, что потоки не будут абсолютно параллельными. Если ваша работа связана с процессором, вы можете запустить однопоточный. Если у вас есть какая-то операция, которая блокирует IO, вы можете получить некоторый параллелизм, но YMMV. Кроме того, вы можете использовать интерпретатор, который обеспечивает полный параллелизм, такой как jRuby или Rubinius.

Есть несколько драгоценных камней, которые реализуют этот шаблон для вас; параллель, персик, а мой называется threach (или же jruby_threach под юроби). Это заменяющая замена для #each, но она позволяет вам указать, сколько потоков следует запускать, используя SizedQueue, чтобы не допустить выхода из-под контроля.

Так...

(1..10).threach(4) {|i| do_my_work(i) }

Не выдвигать мои собственные вещи; Есть много хороших реализаций, чтобы сделать вещи проще.

Если вы используете JRuby, jruby_threach это намного лучшая реализация - Java просто предлагает гораздо более богатый набор потоковых приматов и структур данных для использования.

Исполняемый описательный пример:

require 'thread'

p tasks = [
    {:file => 'task1'},
    {:file => 'task2'},
    {:file => 'task3'},
    {:file => 'task4'},
    {:file => 'task5'}
]

tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}

# run workers
workers_count = 3
workers = []
workers_count.times do |n|
    workers << Thread.new(n+1) do |my_n|
        while (task = tasks_queue.shift(true) rescue nil) do
            delay = rand(0)
            sleep delay
            task[:result] = "done by worker ##{my_n} (in #{delay})"
            p task
        end
    end
end

# wait for all threads
workers.each(&:join)

# output results
puts "all done"
p tasks

Вы можете использовать пул потоков. Это довольно распространенная модель для этого типа проблемы.
http://en.wikipedia.org/wiki/Thread_pool_pattern

Github, кажется, имеет несколько реализаций, которые вы можете попробовать:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool

У целлулоида есть пример рабочего пула, который делает это.

Я использую драгоценный камень под названием work_queue. Это действительно практично.

Пример:

require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
    wq.enqueue_b("Thread#{number}") do |thread_name|  
        puts "Hello from the #{thread_name}"
    end
end
wq.join
Другие вопросы по тегам