gpt4 book ai didi

python - 从 Lambda 函数查询 Athena - QUEUED 状态?

转载 作者:行者123 更新时间:2023-12-01 06:50:03 24 4
gpt4 key购买 nike

我已经在 lambda 函数内部通过 athena 成功查询 s3 一段时间了,但它突然停止工作了。进一步调查显示, get_query_execution() 的响应返回“已排队”状态(我被引导相信没有使用?!)

我的代码如下:

def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})

execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'RUNNING'

while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)

if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:

state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))

s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)

rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED':
return False
time.sleep(10)
return False

所以它显然正在正常工作 - 只是“QUEUED”状态完全出乎意料,我不知道该怎么办?什么会导致 query_execution 变为“已排队”以及我的代码需要更改哪些内容才能满足它?

最佳答案

看看Athena hook在 Apache 气流中。 Athena 具有最终状态(SUCCEEDED、FAILED 和 CANCELLED)和中间状态 - RUNNING 和 QUEUED。 QUEUED 是查询在开始之前的正常状态。所以你可以使用这样的代码:

def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})

execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'QUEUED'

while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)

if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:

state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))

s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)

rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED' or state == 'CANCELLED':
return False
time.sleep(10)
return False

关于python - 从 Lambda 函数查询 Athena - QUEUED 状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59063219/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com