gpt4 book ai didi

amazon-web-services - AWS Step Functions Activity Worker 在工作人员停止时看不到执行

转载 作者:数据小太阳 更新时间:2023-10-29 03:23:16 25 4
gpt4 key购买 nike

适用于 Go 的 AWS SDK 版本?

v2.0.0-preview.3

Go 的版本(go version)?

go1.9.3 darwin/amd64

您看到了什么问题?

我正在为 Go 中的 Step Functions 编写一个 Activity Worker。

时间:

  • 事件 worker 正在运行,
  • 然后,我们从 SFN 控制台开始执行工作流

一切似乎都运行良好。

但是,当:

  • 事件 worker 停止,
  • 然后,我们从 SFN 控制台开始执行工作流,
  • 然后 worker 重新启动,

工作人员似乎在轮询 SFN,但它不执行在其停止期间启动的任务。如果我们在此时开始新的工作流执行(当 worker 正在运行时),则 worker 会成功执行新任务。工作人员停止期间执行的工作流未被工作人员拾取。

编辑:查看执行历史,我看到超时状态和以下事件日志:

enter image description here

重现步骤

这里是我的 SFN 状态机:

{
"Comment": "An example using a Task state.",
"StartAt": "getGreeting",
"Version": "1.0",
"TimeoutSeconds": 300,
"States":
{
"getGreeting": {
"Type": "Task",
"Resource": "arn:aws:states:ap-southeast-1:196709014601:activity:get-greeting",
"End": true
}
}
}

这是我的 SFN 工作人员:

package main

import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/endpoints"
"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/sfn"
"fmt"
"encoding/json"
)

type Worker struct {
svc *sfn.SFN
activityARN string
}

type Task struct {
input *string
token *string
}

func New(activityARN string) *Worker {
cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
panic("unable to load SDK config, " + err.Error())
}
// Set the AWS Region that the service clients should use
cfg.Region = endpoints.ApSoutheast1RegionID

// Using the Config value, create the Step Functions client
svc := sfn.New(cfg)

w := &Worker{
svc: svc,
activityARN: activityARN,
}
return w
}

func (w *Worker) getTask() *Task {
req := w.svc.GetActivityTaskRequest(&sfn.GetActivityTaskInput{
ActivityArn: aws.String(w.activityARN),
})
res, err := req.Send()
if err != nil { fmt.Println("failed to get tasks, "+err.Error()) }
return &Task{
input: res.Input,
token: res.TaskToken,
}
}

// Call SendTaskSuccess on success
func (w *Worker) handleSuccess(taskToken *string, json *string) error {
req := w.svc.SendTaskSuccessRequest(&sfn.SendTaskSuccessInput{
TaskToken: taskToken,
Output: json, // JSON string
})
_, err := req.Send()
if err != nil { fmt.Println("failed to send task success result, "+err.Error()) }
return err
}

// Call SendTaskFailure on error
func (w *Worker) handleFailure(taskToken *string, err error) error {
errorMessage := err.Error()
req := w.svc.SendTaskFailureRequest(&sfn.SendTaskFailureInput{
TaskToken: taskToken,
Error: &errorMessage,
})

_, err = req.Send()
if err != nil { fmt.Println("failed to send task failure result, "+err.Error()) }
return err
}

func main() {
activityARN := "arn:aws:states:ap-southeast-1:196709014601:activity:get-greeting"
worker := New(activityARN)

fmt.Println("Starting worker")
for {
// 1. Poll GetActivityTask API for tasks
fmt.Println("Polling for tasks")
task := worker.getTask()
if task.token == nil { continue }

// 2. Do some actual work
fmt.Println("Working")
result, err := work(task.input)

// 3. Notify SFN on success and failure
fmt.Println("Sending results")
if err == nil {
worker.handleSuccess(task.token, result)
} else {
worker.handleFailure(task.token, err)
}
}
}

// Handles marshalling and un-marshalling JSON
func work(jsonInput *string) (*string, error) {
input := &GreetInput{}
json.Unmarshal([]byte(*jsonInput), input)

result, err := Greet(input) // Actual work
if err != nil { return nil, err }

outputBytes, _ := json.Marshal(result)
output := string(outputBytes)
return &output, nil
}

// Actual handler code
type GreetInput struct {
Who string
}

type GreetOutput struct {
Message string
}

func Greet(input *GreetInput) (*GreetOutput, error) {
message := fmt.Sprintf("hello %s", input.Who)
output := &GreetOutput {
Message: message,
}
fmt.Println(message)
return output, nil
}

运行:

go build worker.go && ./worker

最佳答案

根据您的更新,我认为工作人员没有正常停止(即,当杀死工作人员时,您没有等到 GetActivityTask 请求结束),因此 Step Functions 可能会响应(已经死亡的)工作人员。

所以工作流程如下:

  1. Worker 发送 GetActivityTask 请求并停止(直到达到超时)。
  2. Worker 未等待 GetActivityTask 结束就被杀死。
  3. 创建了新的执行。
  4. Step Functions 发现一些 GetActivityTask 仍然挂起 - 将新执行的任务发送给它。
  5. 但是 worker 已经死了,所以它不会收到那个任务。 Step Functions 认为任务已交付,因此它会一直等到任务结束或超时。

要检查是否是这种情况,只需在杀死工作人员后稍等片刻(我不知道 AWS SDK for Go 中 GetActivityTask 的默认等待时间是多少 - 5 分钟应该可以完成工作)然后创建执行。如果新执行按预期工作,那么您应该向工作人员添加优雅退出(等到 GetActivityTask 结束并最终处理任务)。

关于amazon-web-services - AWS Step Functions Activity Worker 在工作人员停止时看不到执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49292908/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com