gpt4 book ai didi

python - 如何使用 multiprocessing.Pool.apply_async 记录到单个文件

转载 作者:行者123 更新时间:2023-12-01 02:20:04 26 4
gpt4 key购买 nike

我无法记录到使用 multprocess.Pool.apply_async 的单个文件。我正在努力适应this Logging Cookbook 中的示例,但它仅适用于 multiprocessing.Process。将日志队列传递到 apply_async 似乎没有效果。我想使用一个池,以便我可以轻松管理并发线程的数量。

下面的 multiprocessing.Process 改编示例对我来说效果很好,只是我没有从主进程获取日志消息,并且我认为当我有 100 个大型作业时它不会很好地工作。

import logging
import logging.handlers
import numpy as np
import time
import multiprocessing
import pandas as pd
log_file = 'PATH_TO_FILE/log_file.log'

def listener_configurer():
root = logging.getLogger()
h = logging.FileHandler(log_file)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)


def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)


# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_function(sleep_time, name, queue, configurer):
configurer(queue)
start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time)
logging.info(start_message)
time.sleep(sleep_time)
success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time)
logging.info(success_message)

def main_with_process():
start_time = time.time()
single_thread_time = 0.
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
name = str(i)
sleep_time = np.random.randint(10) / 2
single_thread_time += sleep_time
worker = multiprocessing.Process(target=worker_function,
args=(sleep_time, name, queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
end_time = time.time()
final_message = "Script execution time was {}s, but single-thread time was {}s".format(
(end_time - start_time),
single_thread_time
)
print(final_message)

if __name__ == "__main__":
main_with_process()

但是我无法让以下适应工作:

def main_with_pool():
start_time = time.time()
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
pool = multiprocessing.Pool(processes=3)
job_list = [np.random.randint(10) / 2 for i in range(10)]
single_thread_time = np.sum(job_list)
for i, sleep_time in enumerate(job_list):
name = str(i)
pool.apply_async(worker_function,
args=(sleep_time, name, queue, worker_configurer))

queue.put_nowait(None)
listener.join()
end_time = time.time()
print("Script execution time was {}s, but single-thread time was {}s".format(
(end_time - start_time),
single_thread_time
))

if __name__ == "__main__":
main_with_pool()

我使用 multiprocessing.Manager、multiprocessing.Queue、multiprocessing.get_logger、apply_async.get() 尝试了许多细微的变化,但没有任何效果。

我认为对此会有一个现成的解决方案。我应该尝试 celery 吗?

谢谢

最佳答案

这里实际上有两个独立的问题,它们相互交织:

  • 您不能将 multiprocessing.Queue() 对象作为参数传递给基于池的函数(您可以将其传递给直接启动的工作线程,但不能传递给任何“进一步的”,因为它是)。
  • 您必须等待所有异步工作线程完成,然后才能将 None 发送到监听器进程。

要修复第一个,请替换:

queue = multiprocessing.Queue(-1)

与:

queue = multiprocessing.Manager().Queue(-1)

作为管理器管理的Queue()实例可以被传递。

要解决第二个问题,可以收集每个异步调用的每个结果,或者关闭池并等待它,例如:

pool.close()
pool.join()
queue.put_nowait(None)

或更复杂的:

getters = []
for i, sleep_time in enumerate(job_list):
name = str(i)
getters.append(
pool.apply_async(worker_function,
args=(sleep_time, name, queue, worker_configurer))
)
while len(getters):
getters.pop().get()
# optionally, close and join pool here (generally a good idea anyway)
queue.put_nowait(None)

(您还应该考虑将 put_nowait 替换为 put 的等待版本,而不是使用无限长度的队列。)

关于python - 如何使用 multiprocessing.Pool.apply_async 记录到单个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48045978/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com