Расставьте приоритеты выполнения некоторых рабочих процессов над другими
Я использую инфраструктуру потока для amazon swf, и я хочу иметь возможность запускать приоритетные выполнения рабочих процессов и обычные рабочие процессы. Если есть приоритетные задачи, то действия должны выбирать приоритетные задачи раньше, чем обычные приоритетные задачи. Каков наилучший способ сделать это?
Я думаю, что следующее может работать, но мне интересно, есть ли лучший / рекомендуемый подход.
- Я определю двух рабочих и два списка действий для этого действия. Один список приоритетов и один обычный список. Каждый работник будет использовать один и тот же класс активности.
- Оба работника будут работать на одном хосте (экземпляр ec2).
- В рабочем процессе я определю два метода: startNormalWorkflow и startHighWorkflow. В методе startHighWorkflow я могу использовать ActivitySchedulingOptions, чтобы поместить задачу в список с высоким приоритетом.
Проблема с этим подходом состоит в том, что нет никакой гарантии, что задача с высоким приоритетом запланирована перед обычными задачами.
2 ответа
Это хороший вопрос, я немного почесал голову.
Конечно, у этой кошки есть несколько способов избавиться от нее, и существует множество правильных решений. Здесь я сосредоточился на самом простом из возможных вариантов, а именно на выполнении задач в порядке приоритета в рамках одного рабочего процесса.
Сценарий выглядит следующим образом: я определяю одного работника, обслуживающего два списка задач, default_tasks
а также urgent_tasks
с тривиальной логикой:
- Если на
urgent_tasks
список, затем выберите один оттуда, - В противном случае выберите задачу из
default_tasks
- Выполните любую выбранную задачу.
Вопрос в том, как проверить, ожидают ли какие-либо высокоприоритетные задачи? Выручает API CountPendingActivityTasks!
Я знаю, что вы используете Flow для разработки. Мой пример написан с использованием boto.swf.layer2
так как Python намного проще для создания прототипов - но идея остается той же и может быть расширена до более сложного сценария с выполнением рабочих процессов с высоким и низким приоритетом.
Итак, чтобы выполнить вышеизложенное с помощью boto.swf, выполните следующие действия:
Экспорт учетных данных в среду
$ export AWS_ACCESS_KEY_ID=your access key
$ export AWS_SECRET_ACCESS_KEY= your secret key
Получить фрагменты кода
Для удобства вы можете раскошелиться на github:
$ git clone git@github.com:oozie/stackru.git
$ cd stackru/amazon-swf/priority_tasks/
Чтобы загрузить домен и рабочий процесс:
# domain_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackru'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
Реализация решения:
# decider.py
import boto.swf.layer2 as swf
DOMAIN = 'stackru'
ACTIVITY = 'SomeActivity'
VERSION = '1.0'
class MyWorkflowDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Complete workflow execution after 5 completed activities.
closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
if closed_activity_count == 5:
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
Приоритетность реализации работника:
# worker.py
import boto.swf.layer2 as swf
DOMAIN = 'stackru'
VERSION = '1.0'
class PrioritizingWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
def run(self):
urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
if urgent_task_count > 0:
self.task_list = 'urgent_tasks'
else:
self.task_list = 'default_tasks'
activity_task = self.poll()
if 'activityId' in activity_task:
print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
self.complete()
return True
Запустите рабочий процесс из трех экземпляров интерактивной оболочки Python
Запустите решатель:
$ python -i decider.py
>>> while MyWorkflowDecider().run(): pass
...
Начать выполнение:
$ python -i decider.py
>>> swf.WorkflowType(domain='stackru', name='MyWorkflow', version='1.0', task_list='default_tasks').start()
Наконец, начните работу и наблюдайте за выполнением задач:
$ python -i worker.py
>>> while PrioritizingWorker().run(): pass
...
2 urgent tasks in the queue. Executing SomeActivity2
1 urgent tasks in the queue. Executing SomeActivity4
0 urgent tasks in the queue. Executing SomeActivity5
0 urgent tasks in the queue. Executing SomeActivity1
0 urgent tasks in the queue. Executing SomeActivity3
Оказывается, использование отдельного списка задач, который вы должны сначала проверить, не работает должным образом.
Есть пара проблем.
Во-первых, API подсчета не обновляется надежно. Таким образом, вы можете получить 0 заданий, даже если в очереди есть срочные задания.
Во-вторых, вызов, который опрашивает задачи, зависает, если нет доступных задач. Поэтому, когда вы будете опрашивать несрочные задачи, они будут "зависать" либо на 2 минуты, либо до тех пор, пока у вас не будет срочной задачи.
Так что это может вызвать все виды проблем в вашем рабочем процессе.
Чтобы это работало, SWF должен был реализовать API опроса, который мог бы вернуть первую задачу из списка списков задач. Тогда было бы намного проще.