gpt4 book ai didi

python - 如何让 Python 多线程管道使用 90% 的可用内存?

转载 作者:行者123 更新时间:2023-12-04 14:02:44 25 4
gpt4 key购买 nike

我的 Python 脚本在 GCE 实例中运行,以 Pub/Sub 作为输入(异步请求订阅)和输出。
据我了解,通过这种方式我可以控制并发线程的数量,从而限制使用的内存量。如果我设置 max_messages到 100,我的脚本最终会耗尽内存。

from google.cloud import pubsub_v1
from concurrent import futures

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)

def callback(message):
print (str(message.data) + " " + str(threading.current_thread()))
message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)
在我看来,对 worker 和消息的数量进行硬编码是控制内存利用率的一种原始方法。有没有更好的方法让我的脚本分配尽可能多的线程,因为 VM 资源允许尽可能有效地利用它?

最佳答案

一些解决方案

  • 内存限制
    Google 云容器实例已允许内存限制。默认内存限制为 512 MiB。更新内存使用限制的要求是 2vCPU。这对您来说不是问题,因为您使用的是 e2-highcpu-16 Preemptible VM,它提供多达 32 个 CPU。
    您可以在 GCP 控制台、命令行或 YAML 文件中更新内存限制。可配置的最大内存为 8Gi。如果需要确定需要多少内存,可以使用 (Standing Memory) + (Memory per Request) * (Service Concurrency) 计算.欲了解更多信息,请访问 reference documentation
  • CPU 绑定(bind)任务 :
    我不确定您线程中的任务是受 IO 限制还是受 CPU 限制。如果它的 CPU 限制,你可以选择使用 ProcessPoolExecutor .

    The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously.


    ProcessPoolExecutor documentation报价在 max_workers范围。

    If max_workers is None, then the default chosen will be at most 61


  • IO 绑定(bind)任务
    IO 绑定(bind)任务非常适合在线程中使用。
    根据使用的python版本,max_workers的默认值各不相同。
  • 3.5 版:max_workers参数未给出或设置为无,no of processors on machine *5 (考虑到任务更多的是 I/O 相关而不是 CPU
  • 版本 3.8:默认值 max_workers改为 min(32, os.cpu_count() + 4) .此默认值至少为 I/O 绑定(bind)任务保留 5 个工作线程。它最多使用 32 个 CPU 内核来执行释放 GIL 的 CPU 绑定(bind)任务。

  • 如您所用 ThreadPoolExecutormax_workers值设置为 5,如果您使用的是 python 版本>3.5 不设置 max_workers value 显式将允许最大化线程分配。请引用 ThreadPoolExecutor documentation
  • 用于流量控制 :您可以尝试使用此 documentation 中提到的其他参数. FlowControl用于控制使用异步订阅拉取消息的速率的设置。
    class google.cloud.pubsub_v1.types.FlowControl(max_bytes=104857600, max_messages=1000, max_lease_duration=3600, max_duration_per_lease_extension=0)
    max_bytes暂停消息流之前已接收但尚未处理的消息的最大总大小。max_duration_per_lease_extension单次租约延期尝试的最长时间(以秒为单位)。如果订阅者未能延长截止日期,则限制消息重新传递之前的延迟。必须介于 10 到 600(含)之间。如果设置为 0,则忽略。max_lease_duration在将消息从租约管理中删除之前,在消息上保留租约的最长时间(以秒为单位)。max_messages暂停消息流之前已接收但尚未处理的最大消息数。
  • 关于python - 如何让 Python 多线程管道使用 90% 的可用内存?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69460683/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com