gpt4 book ai didi

python-3.x - Firebase Python SDK 发生的阻塞是否有解决方法?喜欢添加完成回调?

转载 作者:行者123 更新时间:2023-12-04 09:40:41 25 4
gpt4 key购买 nike

最近,我将 express.js 中的 REST 服务器代码移至使用 FastAPI。到目前为止,我一直在过渡中取得成功,直到最近。我注意到基于 firebase python admin sdk 文档,与 node.js 不同,python sdk 是阻塞的。文档说 here :

In Python and Go Admin SDKs, all write methods are blocking. That is, the write methods do not return until the writes are committed to the database.



我认为这个特性对我的代码有一定的影响。这也可能是我构建代码的方式。我的一个文件中的一些代码如下:

from app.services.new_service import nService
from firebase_admin import db
import json
import redis

class TryNewService:
async def tryNew_func(self, request):
# I've already initialized everything in another file for firebase
ref = db.reference()
r = redis.Redis()
holdingData = await nService().dialogflow_session(request)
fulfillmentText = json.dumps(holdingData[-1])
body = await request.json()

if ("user_prelimInfo_address" in holdingData):
holdingData.append("session")
holdingData.append(body["session"])
print(holdingData)
return(holdingData)
else:
if (("Default Welcome Intent" in holdingData)):
pass
else:
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]})
print(holdingData)
return(fulfillmentText)


使用 ref.set() 的阻塞效果是否有任何解决方法?在我的代码行?有点像在 node.js 中添加回调?我是 python 3 的 asyncio 世界的新手。

Update as of 06/13/2020: So I added following code and am now getting a RuntimeError: Task attached to a different loop. In my second else statement I do the following:



loop = asyncio.new_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
result = await loop.run_in_executor(pool, ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
print("custom thread pool:{}".format(result))

有了这个新的 RuntimeError,我将不胜感激。

最佳答案

如果你想在异步协程中运行同步代码,那么步骤是:

  • 循环 = get_event_loop()
    注意:获取而不是新的。 Get 提供当前 event_loop,new_even_loop 返回一个新的
  • await loop.run_in_executor(无,sync_method)
  • 第一个参数 = None -> 使用默认执行器实例
  • 第二个参数(sync_method)是要调用的同步代码。


  • 请记住,sync_method 使用的资源需要正确同步:
  • a) 要么使用 asyncio.Lock
  • b) 或使用 asyncio.run_coroutine_threadsafe 函数(见下面的例子)

  • 忘记这个关于 ThreadPoolExecutor 的案例(它提供了一种 I/O 并行性的方法,而不是由 asyncio 提供的并发性)。
    您可以尝试以下代码:
    loop = asyncio.get_event_loop()
    UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
    result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData)
    print("custom thread pool:{}".format(result))
    有了新功能:
    def sync_method(ref, UserVal, holdingData):
    result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
    return result
    请让我知道您的反馈
    注意:以前的代码未经测试。我只测试了下一个最小示例(使用 pytest 和 pytest-asyncio):
    import asyncio
    import time

    import pytest


    @pytest.mark.asyncio
    async def test_1():
    loop = asyncio.get_event_loop()
    delay = 3.0
    result = await loop.run_in_executor(None, sync_method, delay)
    print(f"Result = {result}")

    def sync_method(delay):
    time.sleep(delay)
    print(f"dddd {delay}")
    return "OK"
    回答@jeff-ridgeway 评论:
    让我们尝试更改之前的答案以阐明如何使用 run_coroutine_threadsafe,从同步工作线程执行收集这些共享资源的协程:
  • 在 run_in_executor 中添加循环作为附加参数
  • 将所有共享资源从sync_method 移动到一个新的async_method,该方法由run_coroutine_threadsafe 执行
  • loop = asyncio.get_event_loop()
    UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
    result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData, loop)
    print("custom thread pool:{}".format(result))

    def sync_method(ref, UserVal, holdingData, loop):
    coro = async_method(ref, UserVal, holdingData)
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    future.result()
    async def async_method(ref, UserVal, holdingData)
    result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
    return result
    注意:之前的代码未经测试。现在我测试的最小示例更新了:
    @pytest.mark.asyncio
    async def test_1():
    loop = asyncio.get_event_loop()
    delay = 3.0
    result = await loop.run_in_executor(None, sync_method, delay, loop)
    print(f"Result = {result}")

    def sync_method(delay, loop):
    coro = async_method(delay)
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    return future.result()

    async def async_method(delay):
    time.sleep(delay)
    print(f"dddd {delay}")
    return "OK"
    我希望这会有所帮助

    关于python-3.x - Firebase Python SDK 发生的阻塞是否有解决方法?喜欢添加完成回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62352260/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com