Расставьте приоритеты выполнения некоторых рабочих процессов над другими

Я использую инфраструктуру потока для amazon swf, и я хочу иметь возможность запускать приоритетные выполнения рабочих процессов и обычные рабочие процессы. Если есть приоритетные задачи, то действия должны выбирать приоритетные задачи раньше, чем обычные приоритетные задачи. Каков наилучший способ сделать это?

Я думаю, что следующее может работать, но мне интересно, есть ли лучший / рекомендуемый подход.

  1. Я определю двух рабочих и два списка действий для этого действия. Один список приоритетов и один обычный список. Каждый работник будет использовать один и тот же класс активности.
  2. Оба работника будут работать на одном хосте (экземпляр ec2).
  3. В рабочем процессе я определю два метода: startNormalWorkflow и startHighWorkflow. В методе startHighWorkflow я могу использовать ActivitySchedulingOptions, чтобы поместить задачу в список с высоким приоритетом.

Проблема с этим подходом состоит в том, что нет никакой гарантии, что задача с высоким приоритетом запланирована перед обычными задачами.

2 ответа

Решение

Это хороший вопрос, я немного почесал голову.

Конечно, у этой кошки есть несколько способов избавиться от нее, и существует множество правильных решений. Здесь я сосредоточился на самом простом из возможных вариантов, а именно на выполнении задач в порядке приоритета в рамках одного рабочего процесса.

Сценарий выглядит следующим образом: я определяю одного работника, обслуживающего два списка задач, default_tasks а также urgent_tasks с тривиальной логикой:

  1. Если на urgent_tasks список, затем выберите один оттуда,
  2. В противном случае выберите задачу из default_tasks
  3. Выполните любую выбранную задачу.

Вопрос в том, как проверить, ожидают ли какие-либо высокоприоритетные задачи? Выручает API CountPendingActivityTasks!

Я знаю, что вы используете Flow для разработки. Мой пример написан с использованием boto.swf.layer2 так как Python намного проще для создания прототипов - но идея остается той же и может быть расширена до более сложного сценария с выполнением рабочих процессов с высоким и низким приоритетом.

Итак, чтобы выполнить вышеизложенное с помощью boto.swf, выполните следующие действия:

Экспорт учетных данных в среду

$ export AWS_ACCESS_KEY_ID=your access key
$ export AWS_SECRET_ACCESS_KEY= your secret key 

Получить фрагменты кода

Для удобства вы можете раскошелиться на github:

$ git clone git@github.com:oozie/stackru.git
$ cd stackru/amazon-swf/priority_tasks/

Чтобы загрузить домен и рабочий процесс:

# domain_setup.py 
import boto.swf.layer2 as swf

DOMAIN = 'stackru'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

Реализация решения:

# decider.py
import boto.swf.layer2 as swf

DOMAIN = 'stackru'
ACTIVITY = 'SomeActivity'
VERSION = '1.0'

class MyWorkflowDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]

            decisions = swf.Layer1Decisions()

            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']

            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Complete workflow execution after 5 completed activities.
                closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
                if closed_activity_count == 5:
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

Приоритетность реализации работника:

# worker.py
import boto.swf.layer2 as swf

DOMAIN = 'stackru'
VERSION = '1.0'

class PrioritizingWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION

    def run(self):

        urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
        if urgent_task_count > 0:
            self.task_list = 'urgent_tasks'
        else:
            self.task_list = 'default_tasks'
        activity_task = self.poll()

        if 'activityId' in activity_task:
            print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
            self.complete()
            return True

Запустите рабочий процесс из трех экземпляров интерактивной оболочки Python

Запустите решатель:

$ python -i decider.py
>>> while MyWorkflowDecider().run(): pass
... 

Начать выполнение:

$ python -i decider.py 
>>> swf.WorkflowType(domain='stackru', name='MyWorkflow', version='1.0', task_list='default_tasks').start()

Наконец, начните работу и наблюдайте за выполнением задач:

$ python -i worker.py 
>>> while PrioritizingWorker().run(): pass
... 
2 urgent tasks in the queue. Executing SomeActivity2
1 urgent tasks in the queue. Executing SomeActivity4
0 urgent tasks in the queue. Executing SomeActivity5
0 urgent tasks in the queue. Executing SomeActivity1
0 urgent tasks in the queue. Executing SomeActivity3

Оказывается, использование отдельного списка задач, который вы должны сначала проверить, не работает должным образом.

Есть пара проблем.

Во-первых, API подсчета не обновляется надежно. Таким образом, вы можете получить 0 заданий, даже если в очереди есть срочные задания.

Во-вторых, вызов, который опрашивает задачи, зависает, если нет доступных задач. Поэтому, когда вы будете опрашивать несрочные задачи, они будут "зависать" либо на 2 минуты, либо до тех пор, пока у вас не будет срочной задачи.

Так что это может вызвать все виды проблем в вашем рабочем процессе.

Чтобы это работало, SWF должен был реализовать API опроса, который мог бы вернуть первую задачу из списка списков задач. Тогда было бы намного проще.

Другие вопросы по тегам