gpt4 book ai didi

python - 如何在不在步骤之间传递数据的情况下共享 `AWS Step Functions` 中的数据

转载 作者:行者123 更新时间:2023-11-28 21:33:14 27 4
gpt4 key购买 nike

我使用 AWS Step Functions 并具有以下工作流程

AWS Step Functions workflow

initStep - 这是一个 lambda 函数处理程序,它获取一些数据并将其发送到 SQS 以供外部服务。

activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')

def lambda_handler(event, context):
event['my_activity'] = activity
data = json.dumps(event)

# Retrieving a queue by its name
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue_name)

queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))

return event

validationWaiting - 这是一个事件,等待来自包含数据的外部服务的回答。

complete - 这是一个 lambda 函数处理程序,使用来自 initStep 的数据。

def lambda_handler(event, context):
email = event['email'] if 'email' in event else None
data = event['data'] if 'data' in event else None

client = boto3.client(service_name='ses')
to = email.split(', ')
message_conrainer = {'Subject': {'Data': 'Email from step functions'},
'Body': {'Html': {
'Charset': "UTF-8",
'Data': """<html><body>
<p>""" + data """</p>
</body> </html> """
}}}

destination = {'ToAddresses': to,
'CcAddresses': [],
'BccAddresses': []}

return client.send_email(Source=from_addresses,
Destination=destination,
Message=message_container)

它确实有效,但问题是我将完整数据从 initStep 发送到外部服务,只是为了稍后将其传递给 complete。可能会添加更多步骤。

我认为最好将其作为某种全局数据(当前步骤函数)进行共享,这样我就可以添加或删除步骤,并且数据仍然可供所有人使用。

最佳答案

您可以使用 InputPathResultPath .在 initStep您只会将必要的数据发送到外部服务(可能连同一些唯一的执行标识符)。在ValidaitonWaiting步骤您可以设置以下属性(在状态机定义中):

  • InputPath : 哪些数据将提供给GetActivityTask .可能您想将其设置为类似 $.execution_unique_id 的内容其中 execution_unique_id是您的数据中的字段,外部服务使用它来识别执行(以将其与 initStep 期间的特定请求相匹配)。
  • ResultPath : ValidationWaiting 事件的输出将保存在数据中。您可以将其设置为 $.validation_output来自外部服务的 json 结果将出现在那里。

通过这种方式,您可以仅向外部服务发送它实际需要的数据,并且您不会失去对输入中之前(ValidationWaiting 步骤之前)的任何数据的访问权限。

例如,您可以定义以下状态机:

{
"StartAt": "initStep",
"States": {
"initStep": {
"Type": "Pass",
"Result": {
"executionId": "some:special:id",
"data": {},
"someOtherData": {"value": "key"}
},
"Next": "ValidationWaiting"
},
"ValidationWaiting": {
"Type": "Pass",
"InputPath": "$.executionId",
"ResultPath": "$.validationOutput",
"Result": {
"validationMessages": ["a", "b"]
},
"Next": "Complete"
},
"Complete": {
"Type": "Pass",
"End": true
}
}
}

我用过 Pass initStep 的状态和 ValidationWaiting简化示例(我还没有运行它,但它应该可以工作)。 Result字段特定于 Pass任务,它等同于您的 Lambda 函数或事件的结果。

在这种情况下 Complete步骤将获得以下输入:

{
"executionId": "some:special:id",
"data": {},
"someOtherData": {"value": key"},
"validationOutput": {
"validationMessages": ["a", "b"]
}
}

所以 ValidationWaiting 的结果步骤已保存到validationOutput字段。

关于python - 如何在不在步骤之间传递数据的情况下共享 `AWS Step Functions` 中的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54845487/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com