gpt4 book ai didi

python - 协程中的异步运行时错误 : no running event loop

转载 作者:行者123 更新时间:2023-12-02 03:35:12 24 4
gpt4 key购买 nike

我正在编写多进程代码,它在 Python 3.7 中完美运行。然而,我希望其中一个并行进程执行 IO 进程永远使用 AsyncIO 来获取更好的性能,但一直无法让它运行。

Ubuntu 18.04、Python 3.7、AsyncIO、pipenv(已安装所有 pip 库)

该方法特别使用多线程按预期运行,这就是我想用 AsyncIO 替换的内容。

我已经用谷歌搜索并尝试在 main() 函数中循环,现在只在预期的协同例程中,查看了示例并阅读了有关这种新的异步方式的信息,但到目前为止还没有结果。

以下是执行的app.py代码:python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
logging.config.dictConfig(log_config())
else:
logging.basicConfig(
level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
try:
<some> = <some-class>([
'some-data1.csv',
'some-data2.csv'
])
<some>.run()

except:

traceback.print_exc()
pdb.post_mortem()

sys.exit(0)


if __name__ == '__main__':

asyncio.run(main())

这是我定义给定类的代码

    _sql_client = SQLServer()
_blob_client = BlockBlobStore()
_keys = KeyVault()
_data_source = _keys.fetch('some-data')
# Multiprocessing
_manager = mp.Manager()
_ns = _manager.Namespace()

def __init__(self, list_of_collateral_files: list) -> None:

@timeit
def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

@timeit
def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:

@timeit
def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:

@timeit
def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:

@timeit
def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:

@timeit
def _calc_bid_per_path(self) -> None:

@timeit
def run(self) -> None:

包含异步代码的方法在这里:

    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

all_files = self._blob_client.download_blobs(self._list_of_blob_files)

_all_dfs = pd.DataFrame()
async def read_task(file_: str) -> None:
nonlocal _all_dfs
df = pd.read_csv(StringIO(file_.content))
_all_dfs = _all_dfs.append(df, sort=False)

tasks = []
loop = asyncio.new_event_loop()

for file_ in all_files:
tasks.append(asyncio.create_task(read_task(file_)))

loop.run_until_complete(asyncio.wait(tasks))
loop.close()

_all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
ns.dfs = _all_dfs

调用特定序列的方法和此异步方法是:

    def run(self) -> None:
extract = []
extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))

# Start the parallel processes
for process in extract:
process.start()

# Await for database process to end
extract[1].join()
extract[2].join()

# Merge both database results
self._merge_bids_with_hours(self._ns)

extract[0].join()

self._get_collaterial_per_month(self._ns)
self._calc_bid_per_path()
self._save_reports()
self._upload_data()

这些是我得到的错误:

Process Process-2:
Traceback (most recent call last):
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
tasks.append(asyncio.create_task(read_task(file_)))
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
File "app.py", line 25, in main
caiso.run()
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
self._get_collaterial_per_month(self._ns)
File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
result = method(*args, **kwargs)
File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
credit_margin = ns.dfs
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
return callmethod('__getattribute__', (key,))
File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)

最佳答案

回溯日志来看,您似乎正在尝试将任务添加到未运行的事件循环

/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '._get_filter_collateral..read_task' was never awaited

循环刚刚创建,尚未运行,因此asyncio无法将任务附加到它。

以下示例将重现相同的结果,添加任务,然后尝试等待所有任务完成:

import asyncio
async def func(num):
print('My name is func {0}...'.format(num))

loop = asyncio.get_event_loop()
tasks = list()
for i in range(5):
tasks.append(asyncio.create_task(func(i)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

结果:

Traceback (most recent call last):
File "C:/tmp/stack_overflow.py", line 42, in <module>
tasks.append(asyncio.create_task(func(i)))
File "C:\Users\Amiram\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 324, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited

尽管如此,解决方案非常简单,您只需将任务添加到创建的循环中 - 而不是要求 asyncio 执行它。唯一需要更改的是以下行:

tasks.append(asyncio.create_task(func(i)))

将任务的创建从asyncio更改为新创建的循环,您能够做到这一点,因为这是您的循环,与asynio<不同/em> 正在搜索正在运行的程序。

所以新行应该如下所示:

tasks.append(loop.create_task(func(i)))

另一个解决方案可以运行一个async函数并在那里创建任务(因为该循环现在已经在运行asyncio启用将任务附加到它):

import asyncio
async def func(num):
print('Starting func {0}...'.format(num))
await asyncio.sleep(0.1)
print('Ending func {0}...'.format(num))

loop = asyncio.get_event_loop()
async def create_tasks_func():
tasks = list()
for i in range(5):
tasks.append(asyncio.create_task(func(i)))
await asyncio.wait(tasks)
loop.run_until_complete(create_tasks_func())
loop.close()

这个简单的更改将导致:

Starting func 0...
Starting func 1...
Starting func 2...
Starting func 3...
Starting func 4...
Ending func 0...
Ending func 2...
Ending func 4...
Ending func 1...
Ending func 3...

关于python - 协程中的异步运行时错误 : no running event loop,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58774718/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com