Построение асинхронной очереди в Ruby

Мне нужно обрабатывать задания вне очереди внутри процесса, при этом ввод-вывод выполняется асинхронно. Это довольно просто. Суть в том, что эти задания могут добавлять дополнительные элементы в очередь.

Я думаю, что слишком долго возился с этой проблемой, поэтому мой мозг мутен - это не должно быть слишком сложно. Я продолжаю придумывать сценарий "или-или":

  1. Очередь может выполнять задания асинхронно, а результаты могут быть объединены впоследствии.
  2. Очередь может синхронно выполнять задания до тех пор, пока не завершится последний и очередь не станет пустой.

Я возился со всем от EventMachine и Goliath (оба из которых могут использовать EM::HttpRequest) в целлулоид (хотя на самом деле никогда не удосужился создать что-то с ним) и написать счетчики с использованием волокон. Мой мозг жарен, хотя.

Я просто хотел бы иметь возможность сделать это:

items = [1,2,3]
items.each do |item|
  if item.has_particular_condition? 
    items << item.process_one_way
  elsif item.other_condition?
    items << item.process_another_way
  # ...
  end
end

#=> [1,2,3,4,5,6]

... где 4, 5 и 6 - результаты обработки исходных элементов в наборе, а 7, 8 и 9 - результаты обработки 4, 5 и 6. Мне не нужно беспокоиться о бесконечной обработке очередь, потому что данные, которые я обрабатываю, закончатся через пару итераций.

Приветствуются руководство высокого уровня, комментарии, ссылки на другие библиотеки и т. Д., А также примеры кода реализации более низкого уровня.

2 ответа

Решение

В итоге я реализовал что-то чуть менее идеальное - по сути, просто оборачивая итератор EM Fiber в цикл, который завершается, когда новые результаты не ставятся в очередь.

require 'set'

class SetRunner
  def initialize(seed_queue)
    @results = seed_queue.to_set
  end

  def run
    begin
      yield last_loop_results, result_bucket
    end until new_loop_results.empty?

    return @results
  end

  def last_loop_results
    result_bucket.shift(result_bucket.count)
  end

  def result_bucket
    @result_bucket ||= @results.to_a
  end

  def new_loop_results
    # .add? returns nil if already in the set
    result_bucket.each { |item| @results.add? item }.compact
  end
end

Затем, чтобы использовать его с EventMachine:

queue = [1,2,3]
results = SetRunner.new(queue).run do |set, output|
  EM::Synchrony::FiberIterator.new(set, 3).each do |item|
    output.push(item + 3) if item <= 6
  end
end
# => [1,2,3,4,5,6,7,8,9]

Затем каждый набор запускается с уровнем параллелизма, переданным FiberIterator, но результаты каждого набора будут запускаться на следующей итерации внешнего цикла SetRunner.

У меня были подобные требования в прошлом, и что вам нужно, так это надежная, высокопроизводительная рабочая очередь из ее звуков. Я рекомендую вам ознакомиться с beanstalkd, который я обнаружил более года назад и с тех пор использую для надежной обработки тысяч и тысяч заданий в ruby.

В частности, я начал разрабатывать надежные библиотеки ruby ​​вокруг beanstalkd. В частности, не забудьте проверить backburner, который является рабочей готовой рабочей очередью в ruby ​​с использованием beanstalkd. Синтаксис и настройка просты, определяют скорость выполнения заданий, встроенную обработку сбоев заданий и повторных попыток, а также планирование заданий и многое другое.

Дайте мне знать, если у вас есть какие-либо вопросы, но я думаю, что beanstalkd и backburner вполне соответствуют вашим требованиям.

Другие вопросы по тегам