gpt4 book ai didi

python - 如何从 s3 获取数据并对其进行一些处理? python 和博托

转载 作者:行者123 更新时间:2023-11-28 22:44:05 25 4
gpt4 key购买 nike

我有一个项目任务要使用我已经在 EMR 任务中的 s3 上生成的一些输出数据。所以之前我运行了一个 EMR 作业,它以名为 part-xxxx 的多个文件的形式在我的一个 s3 存储桶中产生了一些输出。现在,我需要从我的新 EMR 作业中访问这些文件,读取这些文件的内容,并使用我需要生成另一个输出的数据。

这是完成这项工作的本地代码:

def reducer_init(self):
self.idfs = {}
for fname in os.listdir(DIRECTORY): # look through file names in the directory
file = open(os.path.join(DIRECTORY, fname)) # open a file
for line in file: # read each line in json file
term_idf = JSONValueProtocol().read(line)[1] # parse the line as a JSON object
self.idfs[term_idf['term']] = term_idf['idf']

def reducer(self, term_poster, howmany):
tfidf = sum(howmany) * self.idfs[term_poster['term']]
yield None, {'term_poster': term_poster, 'tfidf': tfidf}

这在本地运行得很好,但问题是我现在需要的数据在 s3 上,我需要以某种方式在 reducer_init 函数中访问它。

这是我目前所拥有的,但是在 EC2 上执行时失败了:

def reducer_init(self):
self.idfs = {}
b = conn.get_bucket(bucketname)
idfparts = b.list(destination)
for key in idfparts:
file = open(os.path.join(idfparts, key))
for line in file:
term_idf = JSONValueProtocol().read(line)[1] # parse the line as a JSON object
self.idfs[term_idf['term']] = term_idf['idf']

def reducer(self, term_poster, howmany):
tfidf = sum(howmany) * self.idfs[term_poster['term']]
yield None, {'term_poster': term_poster, 'tfidf': tfidf}

AWS 访问信息定义如下:

awskey = '*********'
awssecret = '***********'
conn = S3Connection(awskey, awssecret)
bucketname = 'mybucket'
destination = '/path/to/previous/output'

最佳答案

有两种方法:

  1. 将文件下载到您的本地系统并解析它。 (有点简单,快速和容易)
  2. 将存储在 S3 上的数据获取到内存中并对其进行解析(如果文件很大,则稍微复杂一些)。

第一步:

在 S3 上,文件名存储为 key ,如果您将名为 "Demo" 的文件存储在名为 "DemoFolder" 的文件夹中,则该特定文件的 key 将是 "DemoFolder\Demo"

使用以下代码将文件下载到临时文件夹中。

AWS_KEY = 'xxxxxxxxxxxxxxxxxx'
AWS_SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxxxxxxxx'
BUCKET_NAME = 'DemoBucket'
fileName = 'Demo'

conn = connect_to_region(Location.USWest2,aws_access_key_id = AWS_KEY,
aws_secret_access_key = AWS_SECRET_KEY,
is_secure=False,host='s3-us-west-2.amazonaws.com'
)
source_bucket = conn.lookup(BUCKET_NAME)

''' Download the file '''
for name in source_bucket.list():
if name.name in fileName:
print("DOWNLOADING",fileName)
name.get_contents_to_filename(tempPath)

然后您可以处理该临时路径中的文件。

第 2 步:

您还可以使用 data = name.get_contents_as_string() 获取字符串形式的数据。在大文件(> 1gb)的情况下,您可能会遇到内存错误,为避免此类错误,您将不得不编写一个惰性函数来读取 block 中的数据。

例如,您可以使用 range 使用 data = name.get_contents_as_string(headers={'Range': 'bytes=%s-%s' % ( 0,100000000)})

我不确定我是否正确回答了您的问题,一旦我有时间,我可以根据您的要求定制代码。同时,请随时发布您的任何疑问。

关于python - 如何从 s3 获取数据并对其进行一些处理? python 和博托,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30003579/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com