gpt4 book ai didi

python - 在 AWS Glue Python Shell 中查询 Athena 表

转载 作者:行者123 更新时间:2023-12-02 19:55:32 26 4
gpt4 key购买 nike

Python Shell 作业是在 AWS Glue 中引入的。他们提到:

You can now use Python shell jobs, for example, to submit SQL queries to services such as ... Amazon Athena ...

好的。我们有一个从 Athena 表中读取数据的示例 here :

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

persons = glueContext.create_dynamic_frame.from_catalog(
database="legislators",
table_name="persons_json")
print("Count: ", persons.count())
persons.printSchema()
# TODO query all persons

但是,它使用 Spark 而不是 Python Shell。 Spark 作业类型通常没有这样的库,我有一个错误:

ModuleNotFoundError: No module named 'awsglue.transforms'

如何重写上面的代码以使其在 Python Shell 作业类型中可执行?

最佳答案

问题是,Python Shell 类型有自己有限的一组内置 libraries .

我只能使用 Boto 3 来实现我的目标查询数据和Pandas将其读入数据框。

这是代码片段:

import boto3
import pandas as pd

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
athena_client = boto3.client(service_name='athena', region_name='us-east-1')
bucket_name = 'bucket-with-csv'
print('Working bucket: {}'.format(bucket_name))

def run_query(client, query):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={ 'Database': 'sample-db' },
ResultConfiguration={ 'OutputLocation': 's3://{}/fromglue/'.format(bucket_name) },
)
return response

def validate_query(client, query_id):
resp = ["FAILED", "SUCCEEDED", "CANCELLED"]
response = client.get_query_execution(QueryExecutionId=query_id)
# wait until query finishes
while response["QueryExecution"]["Status"]["State"] not in resp:
response = client.get_query_execution(QueryExecutionId=query_id)

return response["QueryExecution"]["Status"]["State"]

def read(query):
print('start query: {}\n'.format(query))
qe = run_query(athena_client, query)
qstate = validate_query(athena_client, qe["QueryExecutionId"])
print('query state: {}\n'.format(qstate))

file_name = "fromglue/{}.csv".format(qe["QueryExecutionId"])
obj = s3_client.get_object(Bucket=bucket_name, Key=file_name)
return pd.read_csv(obj['Body'])

time_entries_df = read('SELECT * FROM sample-table')

关于python - 在 AWS Glue Python Shell 中查询 Athena 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57166753/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com