Использование Amazon SWF для связи между серверами

Использовать Amazon SWF для обмена сообщениями между серверами?

  1. На сервере A I хотите запустить скрипт A
  2. Когда это закончится, я хочу отправить сообщение на сервер B, чтобы запустить скрипт B
  3. Если он завершится успешно, я хочу, чтобы он очистил задание из очереди рабочего процесса

Мне очень трудно понять, как я могу использовать Boto и SWF в комбинации, чтобы сделать это. Мне не нужен какой-то полный код, но я хочу узнать, может ли кто-нибудь объяснить немного больше о том, что с этим связано.

  • Как на самом деле сказать серверу B проверить завершение сценария A?
  • Как мне убедиться, что сервер A не подхватит завершение сценария A и попытаться запустить сценарий B (так как сервер B должен запустить это)?
  • Как я на самом деле уведомить SWF о завершении сценария A? Это флаг, сообщение или что?

Как вы можете видеть, я очень смущен всем этим, если кто-то может пролить свет на это, я был бы очень признателен.

4 ответа

Решение

Я думаю, что вы задаете несколько очень хороших вопросов, которые подчеркивают, насколько полезным может быть SWF как услуга. Короче говоря, вы не говорите своим серверам координировать работу между собой. Ваш решающий орган организует все это для вас с помощью сервиса SWF.

Реализация вашего рабочего процесса будет идти следующим образом:

  1. Регистрация вашего рабочего процесса и его деятельности в сервисе (разовая).
  2. Реализуем решающее устройство и рабочие.
  3. Пусть бегут ваши работники и решатели.
  4. Начать новый рабочий процесс.

Существует несколько способов ввода учетных данных в код boto.swf. В этом упражнении я рекомендую экспортировать их в среду перед запуском приведенного ниже кода:

export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>

1) Для регистрации домена, рабочего процесса и действий выполните следующее:

# ab_setup.py
import boto.swf.layer2 as swf

DOMAIN = 'stackru'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

2) Внедрение и запуск решателей и работников.

# ab_decider.py
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackru'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class ABDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        # Print history to familiarize yourself with its format.
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]
            decisions = swf.Layer1Decisions()
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                   ACTIVITY1, VERSION, task_list='a_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1) Get activity's event id.
                last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                # 2) Extract its name.
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 3) Optionally, get the result from the activity.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')

                # Take the decision.
                if activity_name == ACTIVITY1:
                    # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                    decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                        ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                elif activity_name == ACTIVITY2:
                    # Server B completed activity. We're done.
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

Работники намного проще, вам не нужно использовать наследование, если вы не хотите.

# ab_worker.py
import os
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackru'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class MyBaseWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION
    task_list = None

    def run(self):
        activity_task = self.poll()
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            try:
                self.activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def activity(self, activity_input):
        raise NotImplementedError

class WorkerA(MyBaseWorker):
    task_list = 'a_tasks'

    def activity(self, activity_input):
        result = str(time.time())
        print 'worker a reporting time: %s' % result
        self.complete(result=result)

class WorkerB(MyBaseWorker):
    task_list = 'b_tasks'

    def activity(self, activity_input):
        result = str(os.getpid())
        print 'worker b returning pid: %s' % result
        self.complete(result=result)

3) Запустите ваши решатели и рабочие. Ваше решение и работники могут работать с разных хостов или с одной и той же машины. Откройте четыре терминала и запустите ваших актеров:

Сначала ваш решатель

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass
... 

Тогда работник А, вы можете сделать это с сервера А:

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass

Затем рабочий B, возможно, с сервера B, но если вы запустите их все с ноутбука, он будет работать так же хорошо:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass
... 

4) Наконец, начните рабочий процесс.

$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackru').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>> 

Вернитесь назад, чтобы увидеть, что происходит с вашими актерами. Они могут отключиться от службы после одной минуты бездействия. Если это произойдет, нажмите стрелки вверх + ввод, чтобы снова войти в цикл опроса.

Теперь вы можете перейти к панели SWF консоли управления AWS, проверить, как выполняются выполнения, и просмотреть их историю. Кроме того, вы можете запросить его через командную строку.

>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ...

Это всего лишь пример рабочего процесса с последовательным выполнением действий, но также возможно, чтобы решающее лицо планировало и координировало параллельное выполнение действий.

Я надеюсь, что это, по крайней мере, поможет вам начать. Для более сложного примера последовательного рабочего процесса я рекомендую взглянуть на это.

У меня нет примера кода для обмена, но вы определенно можете использовать SWF для координации выполнения скриптов на двух серверах. Основная идея состоит в том, чтобы создать три фрагмента кода, которые общаются с SWF:

  • Компонент, который знает, какой сценарий выполнить первым и что делать после завершения выполнения первого сценария. Это называется "решающим фактором" в терминах SWF.
  • Два компонента, каждый из которых понимает, как выполнить определенный скрипт, который вы хотите запустить на каждой машине. Они называются "активными работниками" в терминах SWF.

Первый компонент, решатель, вызывает два API-интерфейса SWF: PollForDecisionTask и RespondDecisionTaskCompleted. Запрос на опрос предоставит компоненту принятия решения текущую историю выполняющегося рабочего процесса, в основном информацию о состоянии "где я" для вашего обработчика сценариев. Вы пишете код, который просматривает эти события и выясняет, какой скрипт должен выполняться. Эти "команды" для выполнения сценария будут в форме планирования задачи деятельности, которая возвращается как часть вызова RespondDecisionTaskCompleted.

Вторые компоненты, которые вы пишете, рабочие действия, каждый вызывает два API SWF: PollForActivityTask и RespondActivityTaskCompleted. Запрос на опрос даст работнику действия указание на то, что ему следует выполнить сценарий, о котором он знает, что SWF-файл называет задачей действия. Информация, возвращаемая из запроса на опрос в SWF, может включать в себя отдельные данные, специфичные для выполнения, которые были отправлены в SWF как часть планирования задачи деятельности. Каждый из ваших серверов будет независимо опрашивать SWF для задач активности, чтобы указать выполнение локального сценария на этом хосте. Как только рабочий завершает выполнение сценария, он перезванивает в SWF через API-интерфейс RespondActivityTaskCompleted.

Обратный вызов от вашего рабочего актива в SWF приводит к тому, что новая история передается компоненту принятия решений, о котором я уже упоминал. Он посмотрит историю, увидит, что первый скрипт выполнен, и наметит выполнение второго. Как только он видит, что второе сделано, он может "закрыть" рабочий процесс, используя другой тип решения.

Вы запускаете весь процесс выполнения сценариев на каждом хосте, вызывая API StartWorkflowExecution. Это создает запись всего процесса в SWF и выводит первую историю в процесс принятия решения, чтобы запланировать выполнение первого сценария на первом хосте.

Надеемся, что это дает немного больше контекста о том, как выполнить этот тип рабочего процесса с использованием SWF. Если вы еще этого не сделали, я бы посмотрел руководство разработчика на странице SWF для получения дополнительной информации.

Хороший пример,

Кроме того, если вы не хотите экспортировать свои учетные данные в среду, вы можете вызывать внутри своих классов:

swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 

Вы можете использовать SNS. Когда скрипт A завершен, он должен запустить SNS, и это вызовет уведомление на сервере B

Другие вопросы по тегам