gpt4 book ai didi

Python获取多处理池的事件进程数

转载 作者:太空宇宙 更新时间:2023-11-03 14:41:40 26 4
gpt4 key购买 nike

我创建了一个带有多处理池的进程池。我有很多任务要处理,但是要获取任务的qps并不容易。所以我想获取池的事件进程号,以便我可以设置适当的池大小。这是整个代码:

import time
from multiprocessing import Pool

def do_work(msg):
# do some work


if __name__ == '__main__':
consumer = KafkaConsumer(
group_id=worker_config.kafka_group_id,
bootstrap_servers=kafka_url,
auto_offset_reset=worker_config.kafka_reset,
enable_auto_commit=True)
consumer.subscribe(topics=worker_config.kafka_topics)

for message in consumer:
logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg))
pool.apply_async(do_work, (message,))
process_count = number_of_active_process_of_pool
logging.info("number_of_active_process_number is %d", process_count)


pool.close()
pool.join()

最佳答案

apply_async 为您提供 AsyncResult: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult

您可以使用 .ready() 来查看它是否已完成。通过这种方式,您可以了解已完成的任务量,进而了解尚未完成的任务量。只要这个数量超过了poolsize,就可以假设poolsize中有很多进程正在运行,如果没有,那么剩余的任务量就是正在运行的进程数。

替代方案:

如果你不使用apply_async而是使用Queue,例如this one然后,您可以使用 .qsize() 获取大致的队列大小

还有multiprocessing.active_children,但只有在这些进程结束时才有效,但池则不然;除非您将其订购到 .join()所以在你的情况下它会起作用。

关于Python获取多处理池的事件进程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46514272/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com