gpt4 book ai didi

Python Asyncio run_forever() 和任务

转载 作者:行者123 更新时间:2023-11-28 22:19:13 27 4
gpt4 key购买 nike

我修改了这段代码以在异步 Python 中使用 Google Cloud PubSub:https://github.com/cloudfind/google-pubsub-asyncio

import asyncio
import datetime
import functools
import os

from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode

async def message_producer():
""" Publish messages which consist of the current datetime """
while True:
await asyncio.sleep(0.1)


async def proc_message(message):
await asyncio.sleep(0.1)
print(message)
message.ack()


def main():
""" Main program """
loop = asyncio.get_event_loop()

topic = "projects/{project_id}/topics/{topic}".format(
project_id=PROJECT, topic=TOPIC)
subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
project_id=PROJECT, subscription=SUBSCRIPTION)

subscription = make_subscription(
topic, subscription_name)

def create_proc_message_task(message):
""" Callback handler for the subscription; schedule a task on the event loop """
print("Task created!")
task = loop.create_task(proc_message(message))

subscription.open(create_proc_message_task)
# Produce some messages to consume

loop.create_task(message_producer())

print("Subscribed, let's do this!")
loop.run_forever()


def make_subscription(topic, subscription_name):
""" Make a publisher and subscriber client, and create the necessary resources """
subscriber = pubsub.SubscriberClient()
try:
subscriber.create_subscription(subscription_name, topic)
except:
pass
subscription = subscriber.subscribe(subscription_name)

return subscription


if __name__ == "__main__":
main()

我基本上去掉了发布代码,只用订阅代码。但是,最初我没有包含 loop.create_task(message_producer()) 行。我认为任务是按预期创建的,但它们从未真正运行过。只有当我添加上述行时,代码才能正确执行并且所有创建的任务都会运行。是什么导致了这种行为?

最佳答案

PubSub 正在从不同的线程调用 create_proc_message_task 回调。因为 create_tasknot thread-safe ,它只能从运行事件循环的线程(通常是主线程)调用。要更正此问题,请将 loop.create_task(proc_message(message)) 替换为 asyncio.run_coroutine_threadsafe(proc_message(message), loop)message_producer将不再需要。

至于为什么 message_producer 似乎修复了代码,请考虑 run_coroutine_threadsafecreate_task 相比,做了两件额外的事情:

  • 它以线程安全的方式运行,因此事件循环数据结构在并发完成时不会损坏。
  • 它确保事件循环在尽可能快的机会唤醒,以便它可以处理新任务。

在您的情况下,create_task 将任务添加到循环的可运行队列(没有任何锁定),但无法确保唤醒,因为在事件循环线程中运行时不需要唤醒。 message_producer 然后用于强制循环定期唤醒,这时它还会检查并执行可运行的任务。

关于Python Asyncio run_forever() 和任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49906034/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com