gpt4 book ai didi

python - 需要 boto3 和 SWF 示例

转载 作者:太空狗 更新时间:2023-10-29 21:22:02 25 4
gpt4 key购买 nike

Amazon 正在为 future 的开发推广 boto3,但没有为新的 boto3 提供足够的文档。

是否有人愿意分享将 SWF 与 boto3 结合使用的示例代码?

最佳答案

这是迄今为止我找到的唯一示例:

https://github.com/jhludwig/aws-swf-boto3

所以流程概述看起来像这样(请注意,这是直接从上面的链接中提取的,但添加了一些额外的注释和更多的流程)。

需要注意的是,SWF是对事物的名称进行操作的。由您的代码赋予这些名称执行意义。例如,您的 Decider 将轮询并使用任务名称决定下一步是什么。

有些事情我不太确定。 TASKLIST 引用我认为是一种命名空间。它并不是真正的事物列表,更多的是按名称隔离事物。现在我可能完全错了,根据我的基本理解,这就是我认为的意思。

您可以在任何地方运行您的决策程序和工作程序。由于它们会向上延伸到 AWS,如果您的防火墙允许 0.0.0.0/0 导出,您将可以访问。

AWS 文档还提到您可以运行 lambda,但我还没有找到如何触发它。

创建 boto3 swf 客户端:

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

创建域

try:
swf.register_domain(
name=<DOMAIN>,
description="Test SWF domain",
workflowExecutionRetentionPeriodInDays="10" # keep history for this long
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")

创建域后,我们现在注册工作流:

注册工作流

try:
swf.register_workflow_type(
domain=DOMAIN, # string
name=WORKFLOW, # string
version=VERSION, # string
description="Test workflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Test workflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

注册工作流后,我们现在可以开始分配任务了。

将任务分配给工作流。

您可以分配N 任务。请记住,这些主要是字符串,您的代码将赋予它们执行意义。

try:
swf.register_activity_type(
domain=DOMAIN,
name="DoSomething",
version=VERSION, # string
description="This is a worker that does something",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Worker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")

发送启动工作流

创建了域、工作流和任务后,我们现在可以开始工作流了。

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
domain=DOMAIN # string,
workflowId='test-1001',
workflowType={
"name": WORKFLOW,# string
"version": VERSION # string
},
taskList={
'name': TASKLIST
},
input=''
)

print "Workflow requested: ", response

注意 workflowId,这是一个自定义标识符,例如 str(uuid.uuid4())。来自文档:

The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

此时,什么都不会发生,因为我们没有运行Decider,也没有任何Workers。让我们看看它们是什么样子。

决策者

我们的决策者将轮询以获得决定任务来做出关于以下方面的决定:

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

注意上面的超时设置。您可以引用此 PR 以了解其背后的基本原理:

https://github.com/boto/botocore/pull/634

来自 Boto3 SWF 文档:

Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).

该 PR 使 boto3 能够执行该功能。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

newTask = swf.poll_for_decision_task(
domain=DOMAIN ,
taskList={'name': TASKLIST }, # TASKLIST is a string
identity='decider-1', # any identity you would like to provide, it's recorded in the history
reverseOrder=False)

if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"

elif 'events' in newTask:

eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]

if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': TASKNAME, # string
'version': VERSION # string
},
'activityId': 'activityid-' + str(uuid.uuid4()),
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': TASKLIST}, # TASKLIST is a string
}
}
]
)
print "Task Dispatched:", newTask['taskToken']

elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"

请注意,在此代码段的末尾,我们检查是否有 ActivityTaskCompleted,并以 CompleteWorkflowExecution 决策做出响应,让 SWF 知道我们已完成。

这是决定因素, worker 长什么样?

worker

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

再次注意,我们设置了read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

现在我们开始我们的 worker 轮询:

print "Listening for Worker Tasks"

while True:

task = swf.poll_for_activity_task(
domain=DOMAIN,# string
taskList={'name': TASKLIST}, # TASKLIST is a string
identity='worker-1') # identity is for our history

if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"

else:
print "New task arrived"

swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)

print "Task Done"

我们再次向 SWF 发出我们已经完成工作的信号。

关于python - 需要 boto3 和 SWF 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32721847/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com