- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
Amazon 正在为 future 的开发推广 boto3,但没有为新的 boto3 提供足够的文档。
是否有人愿意分享将 SWF 与 boto3 结合使用的示例代码?
最佳答案
这是迄今为止我找到的唯一示例:
https://github.com/jhludwig/aws-swf-boto3
所以流程概述看起来像这样(请注意,这是直接从上面的链接中提取的,但添加了一些额外的注释和更多的流程)。
需要注意的是,SWF是对事物的名称进行操作的。由您的代码赋予这些名称执行意义。例如,您的 Decider
将轮询并使用任务名称决定下一步是什么。
有些事情我不太确定。 TASKLIST
引用我认为是一种命名空间。它并不是真正的事物列表,更多的是按名称隔离事物。现在我可能完全错了,根据我的基本理解,这就是我认为的意思。
您可以在任何地方运行您的决策程序和工作程序。由于它们会向上延伸到 AWS,如果您的防火墙允许 0.0.0.0/0 导出,您将可以访问。
AWS 文档还提到您可以运行 lambda,但我还没有找到如何触发它。
import boto3
from botocore.exceptions import ClientError
swf = boto3.client('swf')
try:
swf.register_domain(
name=<DOMAIN>,
description="Test SWF domain",
workflowExecutionRetentionPeriodInDays="10" # keep history for this long
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")
创建域后,我们现在注册工作流:
try:
swf.register_workflow_type(
domain=DOMAIN, # string
name=WORKFLOW, # string
version=VERSION, # string
description="Test workflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Test workflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
注册工作流后,我们现在可以开始分配任务了。
您可以分配N 任务。请记住,这些主要是字符串,您的代码将赋予它们执行意义。
try:
swf.register_activity_type(
domain=DOMAIN,
name="DoSomething",
version=VERSION, # string
description="This is a worker that does something",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Worker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")
创建了域、工作流和任务后,我们现在可以开始工作流了。
import boto3
swf = boto3.client('swf')
response = swf.start_workflow_execution(
domain=DOMAIN # string,
workflowId='test-1001',
workflowType={
"name": WORKFLOW,# string
"version": VERSION # string
},
taskList={
'name': TASKLIST
},
input=''
)
print "Workflow requested: ", response
注意 workflowId
,这是一个自定义标识符,例如 str(uuid.uuid4())
。来自文档:
The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.
此时,什么都不会发生,因为我们没有运行Decider
,也没有任何Workers
。让我们看看它们是什么样子。
我们的决策者将轮询以获得决定任务来做出关于以下方面的决定:
import boto3
from botocore.client import Config
import uuid
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
注意上面的超时设置。您可以引用此 PR 以了解其背后的基本原理:
https://github.com/boto/botocore/pull/634
来自 Boto3 SWF 文档:
Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).
该 PR 使 boto3 能够执行该功能。
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task
print "Listening for Decision Tasks"
while True:
newTask = swf.poll_for_decision_task(
domain=DOMAIN ,
taskList={'name': TASKLIST }, # TASKLIST is a string
identity='decider-1', # any identity you would like to provide, it's recorded in the history
reverseOrder=False)
if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"
elif 'events' in newTask:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]
if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': TASKNAME, # string
'version': VERSION # string
},
'activityId': 'activityid-' + str(uuid.uuid4()),
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': TASKLIST}, # TASKLIST is a string
}
}
]
)
print "Task Dispatched:", newTask['taskToken']
elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"
请注意,在此代码段的末尾,我们检查是否有 ActivityTaskCompleted
,并以 CompleteWorkflowExecution
决策做出响应,让 SWF 知道我们已完成。
这是决定因素, worker 长什么样?
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task
再次注意,我们设置了read_timeout
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
现在我们开始我们的 worker 轮询:
print "Listening for Worker Tasks"
while True:
task = swf.poll_for_activity_task(
domain=DOMAIN,# string
taskList={'name': TASKLIST}, # TASKLIST is a string
identity='worker-1') # identity is for our history
if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"
else:
print "New task arrived"
swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)
print "Task Done"
我们再次向 SWF 发出我们已经完成工作的信号。
关于python - 需要 boto3 和 SWF 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32721847/
我正在评估 Amazon SWF 作为构建分布式工作流系统的选项。主要语言将是 Java,因此 Flow 框架是一个显而易见的选择。只有一件事让我感到困惑,在我推荐它作为我们架构中的关键组件之前,我会
我在将我的 swf 文件置于另一个 swf 文件之上时遇到问题。我在这里尝试了解决方案: http://jsfiddle.net/j08691/ezXjx/ #a { position
我正在处理一个项目,我需要打开一个 swf 文件,更改 swf 标签的内容 不接触其他标签 我试过“SwfDotNet 库”,但它不输出未知标签。例如,如果它遇到“EXPORTASSETS”标签,它不
希望这不会被视为两次问同样的问题...... 所以我正在开发一个 Flash 网站(在 AS2 中),它有一个外部索引 swf,它使用 loadMovie("subfoo1.swf", placeTo
我在使用 Flash 和 Flex 时遇到了一个非常奇怪的问题。在某些情况下,如果同时加载了另一个 SWF,则在运行时(使用 Loader)加载的 SWF 中的影片剪辑似乎无法实例化。这是重现错误的程
当我创建暂时隐藏在选项卡中的 SWF 对象时,因此在某些浏览器(如 FireFox)中未完全加载,我似乎无法找到方法来确定 SWF 是否已加载,因此我可以与它。 /* Generate SWF (on
我加载了一个外部 SWF。 外部 SWF 有一个嵌入的 DisplayObject (getChildAt(0)) 我得到了它的 Class 实例以下代码: public function g
这是一个更一般的问题,而不是“帮助修复代码”问题: 目标 父 swf a.swf 加载外部子 b.swf 子 swf 有跟踪语句:[timestamp][log level][class] msg 父
我想在显示swf文件的webview中显示另一个swf文件,并且我可以获取显示swf文件的路径,但是当我将另一个swf文件拖放到显示swf文件的webview时,swf文件无法获取打不开,为什么?以及
目标:无需 www.macromedia.com/support/documentation/上的任何特殊权限,即可从本地协议(protocol)(widget://、file://、chrome-e
我正在尝试通过 CSS(绝对定位)将透明 PNG 定位在 SWF 影片上。但是,当我将 PNG 直接放在 SWF 上时,SWF 上的所有单击操作似乎都被禁用了。 SWF 要求使用您的网络摄像头,但您无
我想知道是否有人知道以编程方式创建 Flash swf 文件的任何库。 或者用于从 svg 创建 swf。 最佳答案 这听起来对 swfmill 来说是一份完美的工作我想您会发现它可以让您同时执行两种
我有一个加载其他几个 swf 的主要“父” swf。如果主 swf 发生了什么事,我需要告诉其中一个子 swf。 反过来,这似乎效果很好。任何 child 都可以简单地 dispatchEvent()
概览: 我有一个 SWF 横幅广告模板,它从我开发的平台加载 JSON,然后循环浏览该 JSON 中指定的一些产品。每个产品都由您的标准标题、价格和图像组成。 在平台上,用户可以通过一些 UI 工具(
我一直在努力让 AS2 swf 在 AS3 swf 中正确加载 - 但没有成功... AS2 文件(这是一个相当大的应用程序,引用许多外部 xml 文件等)在 Flash Player 中启动时可以完
我正在尝试做这样的事情:拿一个 swf 文件损坏它,然后..在设备内部再次将其可读(未损坏)...我将使用我自己编写的设备指纹检查该设备是否可以恢复 swf.. 得到? 我可以这样做吗?有什么办法呢?
我需要从一些 swf 文件中提取所有文本。我使用 Java 是因为我有很多用这种语言开发的模块。因此,我在 Web 上搜索了所有专门用于处理 SWF 文件的免费 Java 库。最后,我找到了Stuar
我认为这个问题不言自明。以前有人这样做过吗? 更新:澄清我为什么需要这样做。我们有一个 AS1 - AS2 站点的单一 swf 庞然大物,带有大型视频库部分。客户想要更新视频部分,因为 AS2 代码无
有没有人成功使用过 [Flagstone Software][1] 的 Transform SWF for Java 库 [1]: http://www.flagstonesoftware.com/t
例如,隐藏一个简单的 youtube 嵌入 document.getElementById('youtube').style.display = 'block'; document.getElemen
我是一名优秀的程序员,十分优秀!