Получение дубликатов решений от AWS SWF при использовании boto3

Я создал простой рабочий процесс SWF, но, похоже, получаю несколько уведомлений о новых доступных решениях. Я использую boto3 Python SDK.

Нет хорошего примера кода boto3 swf, который я могу найти, поэтому я начал с примера boto2 по адресу http://boto.cloudhackers.com/en/latest/swf_tut.html.

Я создал свой простой рабочий процесс с помощью этого сценария, который создает домен, рабочий процесс и одну задачу в этом рабочем процессе:

#!/usr/bin/python

import boto3
from botocore.exceptions import ClientError


swf = boto3.client('swf')

try:
  swf.register_domain(
    name="surroundiotest-swf",
    description="Surroundio test SWF domain",
    workflowExecutionRetentionPeriodInDays="10"
  )
except ClientError as e:
  print "Domain already exists: ", e.response.get("Error", {}).get("Code")

try:
  swf.register_workflow_type(
    domain="surroundiotest-swf",
    name="testflow",
    version="0.1",
    description="testworkflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": "testflow"}
  )
  print "testflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

try:
  swf.register_activity_type(
    domain="surroundiotest-swf",
    name="testworker",
    version="0.1",
    description="testworker",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": "testflow"}
  )
  print "testworker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")

мой рабочий код:

#!/usr/bin/python

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain='surroundiotest-swf',
    taskList={'name': 'testflow'},
    identity='worker-1')

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

мой решающий код:

#!/usr/bin/python

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)


print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain='surroundiotest-swf',
    taskList={'name': 'testflow'},
    identity='decider-1',
    reverseOrder=True)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': 'testworker',
                    'version': '0.1'
                    },
                'activityId': 'activityid-1001',
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': 'testflow'},
            }
          }
        ]
      )
      print "Task Dispatched"
      # print json.dumps(newTask, default=json_serial, sort_keys=True, indent=4, separators=(',', ': '))

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"

Когда я запускаю эти сценарии и отправляю запрос, решающее лицо получает задачу и отправляет ее, а рабочий получает и выполняет ее. Тем не менее, решающее лицо уведомляется о задаче при каждом последующем опросе, поэтому я вижу вывод, подобный следующему:

Listening for Decision Tasks
Poll timed out, no new task.  Repoll
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'}
Task Dispatched

Казалось бы, SWF не получает уведомления о том, что решающее лицо отправило задачу, и постоянно доставляет задачу решающему. Есть ли какая-то часть настройки принятия решения, которую я делаю неправильно, есть ли какая-то дополнительная информация, которую я должен передать SWF?

1 ответ

Решение

У вас есть небольшая ошибка. Вы проводите опрос для решения задач, используя reverseOrder=True, Из документации API PollForDecisionTask:

обратный порядок
Когда установлено true, возвращает события в обратном порядке. По умолчанию результаты возвращаются в порядке возрастания eventTimestamp событий.

С reverseOrder=True Вы получаете самое старое событие в прошлом, которое всегда WorkflowExecutionStarted, После того, как задача выполнена, вы всегда запланировали это снова.

Как правило, вы хотите использовать previousStartedEventId и обрабатывать только новые события, которые произошли после предыдущего решения задачи. Обработка только последней задачи деятельности не всегда достаточна, потому что могло быть несколько событий для обработки.

Другие вопросы по тегам