gpt4 book ai didi

python-3.x - AWS Python SDK Boto3 EMR client.get_waiter ("step_complete") 失败

转载 作者:行者123 更新时间:2023-12-05 08:08:12 24 4
gpt4 key购买 nike

我有一个 python 脚本,它使用 AWS Python SDK Boto3 来实例化一个新的 EMR 集群,其中包含要完成的步骤列表,然后使用 client.get_waiter("step_complete") 函数等待步骤完成.这个命令偶尔会起作用,每隔一段时间我就会收到一个异常,说命令失败了。令我感到奇怪的是,有时代码可以正常工作,但有时却不能正常工作。

my_waiter = client.get_waiter("step_complete")
my_waiter .wait(
ClusterId=jobFlowId,
StepId=stepId,
WaiterConfig={
"Delay": 60,
"MaxAttempts": 40
}
)

异常(exception):

[2018-07-23 19:20:40,512] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:20:40,511] {connectionpool.py:203} INFO - Starting new HTTP connection (1): 1.2.3.4
[2018-07-23 19:20:40,514] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:20:40,514] {connectionpool.py:203} INFO - Starting new HTTP connection (1): 1.2.3.4
[2018-07-23 19:20:40,551] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:20:40,550] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): foobar-config.s3.amazonaws.com
[2018-07-23 19:20:40,733] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:20:40,733] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:21:41,531] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:21:41,531] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:22:41,727] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:22:41,726] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:23:41,875] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:23:41,875] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:24:42,061] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:24:42,061] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:25:42,212] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:25:42,212] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:26:42,429] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:26:42,428] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:27:42,639] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:27:42,639] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:28:42,782] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:28:42,782] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:29:42,938] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:29:42,938] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:30:43,171] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:30:43,170] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:31:43,409] {base_task_runner.py:98} INFO - Subtask: [2018-07-23 19:31:43,408] {connectionpool.py:238} INFO - Resetting dropped connection: elasticmapreduce.us-east-1.amazonaws.com
[2018-07-23 19:31:43,605] {models.py:1595} ERROR - Waiter StepComplete failed: Waiter encountered a terminal failure state
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 89, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ec2-user/airflow/resources/emr-scripts/arl_emr_with_policyagent.py", line 366, in main
"MaxAttempts": 40
File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 53, in wait
Waiter.wait(self, **kwargs)
File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 323, in wait
last_response=response,
botocore.exceptions.WaiterError: Waiter StepComplete failed: Waiter encountered a terminal failure state
[2018-07-23 19:31:43,606] {models.py:1624} INFO - Marking task as FAILED.
[2018-07-23 19:31:43,636] {models.py:1644} ERROR - Waiter StepComplete failed: Waiter encountered a terminal failure state

更新:

忘记说了,步骤都是在EMR Cluster上通过的。我可以在 AWS 管理控制台上看到 EMR 中的所有内容都成功。除了服务员失败之外没有其他错误。

更新 2:

对于回到这里的人,我最终只是制作了自己的服务员功能

import logging
import boto3
import json
import sys
import time

def emrWaiter(cluster_id, step_id):

# Range is number of minutes to wait
rangeValue = 480
for attempt in range(rangeValue):

# Refresh the client
client = boto3.client('emr', region_name='us-east-1')

step_status = client.describe_step(
ClusterId=cluster_id,
StepId=step_id
)

if step_status["Step"]["Status"]["State"] == "COMPLETED":
logging.info(step_id + " - EMR step has finished")
# Finished
break

if step_status["Step"]["Status"]["State"] == "PENDING":
logging.info(step_id + " - EMR step is pending")
# Sleep for one minute
time.sleep(60)

if step_status["Step"]["Status"]["State"] == "RUNNING":
logging.info(step_id + " - EMR step is running")
# Sleep for one minute
time.sleep(60)

if step_status["Step"]["Status"]["State"] == "CANCEL_PENDING":
logging.info(step_id + " - EMR step Failed")
# Failed
raise Exception(step_id + ' - Task failed with CANCEL_PENDING')

if step_status["Step"]["Status"]["State"] == "CANCELLED":
logging.info(step_id + " - EMR step Failed")
# Failed
raise Exception(step_id + ' - Task failed with CANCELLED')

if step_status["Step"]["Status"]["State"] == "FAILED":
logging.info(step_id + " - EMR step Failed")
# Failed
raise Exception(step_id + ' - Task failed with FAILED')

if step_status["Step"]["Status"]["State"] == "INTERRUPTED":
logging.info(step_id + " - EMR step Failed")
# Failed
raise Exception(step_id + ' - Task failed with INTERRUPTED')

if attempt == (rangeValue - 1):
logging.info(step_id + " - Task timed out")
# Failed
raise Exception(step_id + ' - Task timed out')

最佳答案

谢谢@kyle,

为简洁起见,这里有一些修改

    import logging
import boto3
import json
import sys
import time

logging.basicConfig(level=logging.INFO)
def emrWaiter(cluster_id, step_id):

# Range is number of minutes to wait
rangeValue = 480
for attempt in range(rangeValue):

# Refresh the client
client = boto3.client('emr', region_name='us-east-1')

step_status = client.describe_step(
ClusterId=cluster_id,
StepId=step_id
)

if step_status["Step"]["Status"]["State"] == "COMPLETED":
logging.info(step_id + " - EMR step has finished")
# Finished
break

if step_status["Step"]["Status"]["State"] == "PENDING":
logging.info(step_id + " - EMR step is pending")
# Sleep for one minute
time.sleep(60)

if step_status["Step"]["Status"]["State"] == "RUNNING":
logging.info(step_id + " - EMR step is running")
# Sleep for one minute
time.sleep(60)

if step_status["Step"]["Status"]["State"] in ["CANCEL_PENDING","CANCELLED","FAILED","INTERRUPTED"]:
logging.error(step_id + " EMR step in {} raising exception ".format(step_status["Step"]["Status"]["State"]))
# Failed
msg=step_status["Step"]["Status"]
creation_time = msg['CreationDateTime'].strftime('')
raise Exception(step_id + ' - Task failed with :{}'.format(step_status["Step"]["Status"]))

if attempt == (rangeValue - 1):
logging.info(step_id + " - Task timed out")
# Failed
raise Exception(step_id + ' - Task timed out')

关于python-3.x - AWS Python SDK Boto3 EMR client.get_waiter ("step_complete") 失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51487546/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com