gpt4 book ai didi

python - 无法让我的异步函数正常工作

转载 作者:行者123 更新时间:2023-12-01 00:36:20 24 4
gpt4 key购买 nike

我正在尝试基于 WebSocket 提供的数据构建脚本,但我有一个无法解决的棘手问题。我有两个细胞。

第一个:

msg = ''

stream = {}

async def call_api(msg):
async with websockets.connect('wss://www.bitmex.com/realtime?subscribe=quote:XBTUSD,quote:ETHU19') as websocket:
await websocket.send(msg)
while websocket.open:
response = await websocket.recv()
response = json.loads(response)

if 'data' in list(response.keys()):

response = response['data']

for n in range(len(response)):

symbol = response[n]['symbol']

stream[symbol] = response[n]['askPrice']


loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))

第二个:

stream['XBTUSD']

如果我在 Jupyter Notebook 中运行第一个单元格,然后手动运行第二个单元格,python 将打印正确的值。但是,如果我按下“重新启动当前内核并重新执行整个笔记本”按钮,我会在第二个单元格中收到错误 KeyError: 'XBTUSD' 。当我使用 python shell 运行脚本时也会发生此错误。

我无法理解这两个执行之间的行为差​​异。

最佳答案

这是因为您在第一个单元格中创建了一个异步任务,但没有等待它完成。 loop.create_task() 立即返回,并让事件循环在事件循环处于事件状态时继续在后台执行创建的任务。 (在这种情况下,事件循环在笔记本内核运行时保持运行。)因此,loop.create_task() 使您的 Jupyter 笔记本认为第一个单元已立即完成。

请注意,Jupyter 笔记本本身也针对内核进程异步工作,因此,如果您在第一个单元之后运行第二个单元太快(例如,使用“重新启动当前内核并重新执行整个笔记本”而不是手动单击运行按钮),在第二个单元格的执行开始之前,第一个单元格的异步任务不会完成。

要确保第一个单元在报告单元执行已完成之前实际完成任务,请使用 run_until_complete() 而不是 create_task():

loop = asyncio.get_event_loop()
loop.run_until_complete(call_api(msg))

或者,通过引用获得对任务的额外控制:

loop = asyncio.get_event_loop()
t = loop.create_task(call_api(msg))
loop.run_until_complete(t)

如果您想让任务无限期地在后台运行,您需要采用不同的方法。

  • 不要使用 Jupyter Notebook 并编写守护进程来持续获取和处理 Websocket 消息。 Jupyter 不提供任何方法来跟踪内核进程中的后台异步任务并通过此类后台任务的事件触发器执行单元。 Jupyter Notebook 根本不是实现此类模式的工具。
  • 要解耦 Websocket 消息接收器和处理例程,请使用中间队列。如果双方运行在同一进程和同一事件循环中,则可以使用asyncio.Queue。如果处理发生在使用同步代码的不同线程中,您可以尝试 janus 。如果处理发生在不同的进程中,请使用 multiprocessing.Queue 或其他一些 IPC 机制。

关于python - 无法让我的异步函数正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57735012/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com