gpt4 book ai didi

Python - 将数据从文件流式传输到启用异步的 Kinesis Producer

转载 作者:行者123 更新时间:2023-11-30 21:57:47 24 4
gpt4 key购买 nike

我正在使用 Python 3.6 中支持异步的 Kinesis Producer 模块来部署到 AWS Lambda(因此我需要它与 3.6 兼容)。

我的用例是从磁盘延迟读取文件(大约 100MB 压缩 - 1GB 未压缩)并将数据(一次 500 行)流式传输到 Kinesis Producer。我希望 Kinesis Producer 在我读取下一批 500 行时开始将 500 条记录推送到 Kinesis。

我注意到它一次读取整个文件 500 行,然后开始将数据推送到 Kinesis Producer。看来原因是因为我没有打电话 await asyncio.sleep(1) ,但我也不知道我是否以正确的方式这样做。

def lambda_handler(event, context):
event_loop = asyncio.get_event_loop()
# Extract filename from event and download file from S3
event_loop.run_until_complete(process(filename))
pending = asyncio.Task.all_tasks()
event_loop.run_until_complete(asyncio.gather(*pending))


async def process(filename):
for chunk in read_lines(filename, MAX_RECORDS_IN_BATCH):
asyncio.ensure_future(write_kinesis(chunk)).add_done_callback(callback)


def callback(result):
print(str(result))


async def write_kinesis(records):
future = asyncio.ensure_future(producer.put_records(records=records))

如果我添加await asyncio.sleep(.1)process(filename)结束函数,比它似乎完全按照我想要的方式执行,当然,它实际上阻塞了主线程 0.1 秒。

问-这就是技巧吗,用 asyncio.sleep 阻塞足够长的时间,以便 Kinesis Producer 推出数据?它休眠的时间越少,我在内存中保存的数据就越多,因为 kinesis 客户端没有那么多时间将数据推出,但它会运行得更快(在一定程度上)?

问- 我这样做的方式正确吗?再次,我尝试读取 500 行,推送到 kinesis(异步),在 Kinesis 客户端工作时再读取 500 行,冲洗并重复。

此外,当查看回调函数中的打印语句时,我注意到如果 write_kinesis 函数没有返回任何内容,则回调的打印语句有 result=None ,而如果 write_kinesis 函数返回 Future,则回调的打印语句具有 result=<Task pending...11b35f18>()

问-我假设没有 return 语句,就没有结果,但是为什么当状态仍然是“Pending”时它会调用回调函数?

编辑1:我忘了提及,Kinesis 客户端已经启用了异步功能。

最佳答案

你只需使用await调用异步函数

async def process(filename):
for chunk in read_lines(filename, MAX_RECORDS_IN_BATCH):
await producer.put_records(records=chunk)

async def run():
# Extract filename from event and download file from S3
await process(filename)

loop = asyncio.get_event_loop()
loop.run_until_complete( run() )
loop.close()

关于Python - 将数据从文件流式传输到启用异步的 Kinesis Producer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55150171/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com