gpt4 book ai didi

python - 从连续运行的 flask 应用程序中异步调用函数

转载 作者:行者123 更新时间:2023-11-28 18:45:25 24 4
gpt4 key购买 nike

我有一个与 Dropbox 集成的 Flask 应用程序。我需要通过调用 fetch_dropbox_data() 为每个用户创建一个 DropboxClient 实例。

我似乎无法确定我应该如何调用 fetch_dropbox_data() 这样它:

  1. 不阻止用户与网络应用程序的交互
  2. 每个用户只运行一个实例
  3. 可以在有或没有用户交互的情况下启动/重新启动

我应该使用类似 Celery 的东西吗?为了这?或者还有其他更好的方法吗?我开始使用 multiprocessing,但它似乎不符合要求。我当然会很感激任何指示。谢谢!

这是一个简单的例子来说明我正在尝试做的事情:

from flask import Flask, request, redirect, url_for, session
app = Flask(__name__)

@app.route('/some/route/')
def route():

session_access_token = "some_access_token"

# Need to call this asynchronously as to not block
fetch_dropbox_data(session_access_token)

return "ok"

# Only one DropboxClient instance should be created per user
def fetch_dropbox_data(session_access_token):


client = dropbox.client.DropboxClient(session_access_token)

cursor = None
while True:
result = client.delta(cursor)
cursor = result['cursor']
if result['reset']:
print 'RESET'

for dir_path, metadata in result['entries']:
print "in the for loop"
if metadata is not None:
print '%s was created/updated by' % (dir_path)

else:
print '%s was deleted by %s' % (dir_path, session_access_token)

# if has_more is true, call delta again immediately
if not result['has_more']:

changes = False
# poll until there are changes
while not changes:
response = requests.get('https://api-notify.dropbox.com/1/longpoll_delta',
params={
'cursor': cursor, # latest cursor from delta call
'timeout': 120 # default is 30 seconds
})
data = response.json()

print "data: %s" % data

changes = data['changes']
if not changes:
print 'Timeout, polling again...'

backoff = data.get('backoff', None)
if backoff is not None:
print 'Backoff requested. Sleeping for %d seconds...' % backoff
time.sleep(backoff)
print 'Resuming polling...'

return 'Authenticated.'


if __name__ == '__main__':
app.run(debug=True)

最佳答案

您可以继续使用多处理来执行异步函数调用,正如您在下面的非常基本的示例中看到的那样。

import multiprocessing

class MetaSingleton(type):
instance = None
def __call__(cls, *args, **kw):
if cls.instance is None:
cls.instance = super(MetaSingleton, cls).__call__(*args, **kw)
return cls.instance

class PoolManager(object):
__metaclass__ = MetaSingleton


def __init__(self):
self.pool = None

def create_pool(self, count=4):
self.pool = multiprocessing.Pool(processes=count)

def use_worker(self, func, params, tout=1):
result = self.pool.apply_async(func, params)
return result.get(timeout=tout)

def map_workers(self, func, params):
return self.pool.map(func, params)

def get_pool(self):
return self.pool

if __name__ == '__main__':


def test1(x):
return x*x


a = PoolManager()
print id(a)
a.create_pool()
print a.get_pool()

print a.use_worker(test1, [10])



b = PoolManager()
print id(b)
print b.get_pool()

print b.use_worker(test1, [10])

这需要扩展以跟踪用户执行情况。

关于python - 从连续运行的 flask 应用程序中异步调用函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21143041/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com