- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 Python 中 fork 进程时遇到内存分配问题。我知道这个问题已经在其他一些帖子中讨论过了,但是我在其中任何一个帖子中都找不到好的解决方案。
这是一个说明问题的示例脚本:
import os
import psutil
import subprocess
pid = os.getpid()
this_proc = psutil.Process(pid)
MAX_MEM = int(psutil.virtual_memory().free*1E-9) # in GB
def consume_memory(size):
""" Size in GB """
memory_consumer = []
while get_mem_usage() < size:
memory_consumer.append(" "*1000000) # Adding ~1MB
return(memory_consumer)
def get_mem_usage():
return(this_proc.memory_info()[0]/2.**30)
def get_free_mem():
return(psutil.virtual_memory().free/2.**30)
if __name__ == "__main__":
for i in range(1, MAX_MEM):
consumer = consume_memory(i)
mem_usage = get_mem_usage()
print("\n## Memory usage %d/%d GB (%2d%%) ##" % (int(mem_usage),
MAX_MEM, int(mem_usage*100/MAX_MEM)))
try:
subprocess.call(['echo', '[OK] Fork worked.'])
except OSError as e:
print("[ERROR] Fork failed. Got OSError.")
print(e)
del consumer
该脚本在 Arch Linux 上使用 Python 2.7 和 3.6 进行了测试,并使用 psutils 来跟踪内存使用情况。它逐渐增加 Python 进程的内存使用量,并尝试使用 subprocess.call() fork 一个进程。如果可用性超过 50%,则 fork 失败。内存被父进程消耗。
## Memory usage 1/19 GB ( 5%) ##
[OK] Fork worked.
## Memory usage 2/19 GB (10%) ##
[OK] Fork worked.
## Memory usage 3/19 GB (15%) ##
[OK] Fork worked.
[...]
## Memory usage 9/19 GB (47%) ##
[OK] Fork worked.
## Memory usage 10/19 GB (52%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory
## Memory usage 11/19 GB (57%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory
## Memory usage 12/19 GB (63%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory
## Memory usage 13/19 GB (68%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory
[...]
请注意,我在运行此测试时没有激活 Swap。
似乎有两种选择可以解决这个问题:
我在我的台式机上尝试了后者,上面的脚本没有错误地完成了。但是,在我正在处理的计算集群上,我无法使用这些选项中的任何一个。
不幸的是,在消耗内存之前提前 fork 所需的进程也不是一种选择。
有人对如何解决这个问题有其他建议吗?
谢谢!
最好的
莱昂纳德
最佳答案
您面临的问题实际上与 Python 无关,也不是仅靠 Python 就可以真正改变的问题。按照 mbrig 的建议预先启动 fork 进程(执行程序)在评论中似乎真的是这种情况下最好和最干净的选择。
无论是否使用 Python,您都在处理 Linux(或类似系统)如何创建新进程。您的父进程首先调用 fork(2)它创建一个新的子进程作为自身的副本。当时它实际上并没有将自己复制到其他地方(它使用写时复制),尽管如此,它会检查是否有足够的空间可用,如果没有,则将 errno
设置为 12: ENOMEM
-> 您看到的 OSError
异常。
是的,允许 VMS 过度使用内存可以抑制这个错误的弹出……如果你在 child 中执行新程序(它最终也会变得更小)。它不必立即导致任何故障。但这听起来可能会进一步解决这个问题。
增加内存(添加交换)。突破极限,只要你运行的进程的两倍仍然适合可用内存,fork 就可以成功。有了后续执行官,交换甚至不需要被利用。
似乎还有一种选择,但它看起来……很脏。还有另一个系统调用 vfork()这将创建一个新进程,该进程最初与其父进程共享内存,此时父进程的执行已暂停。这个新创建的子进程只能设置vfork
返回的变量,它可以_exit
或exec
。因此,它不会通过任何 Python 接口(interface)公开,如果您尝试(我确实)使用 ctypes
将它直接加载到 Python 中,它会出现段错误(我想是因为 Python 仍然会做其他事情,而不仅仅是这三个在 vfork
之后和我可以在 child 中 exec
其他内容之前提到的操作)。
就是说,您可以将整个 vfork
和 exec
委托(delegate)给您加载的共享对象。作为一个非常粗略的概念证明,我就是这样做的:
#include <errno.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
char run(char * const arg[]) {
pid_t child;
int wstatus;
char ret_val = -1;
child = vfork();
if (child < 0) {
printf("run: Failed to fork: %i\n", errno);
} else if (child == 0) {
printf("arg: %s\n", arg[0]);
execv(arg[0], arg);
_exit(-1);
} else {
child = waitpid(child, &wstatus, 0);
if (WIFEXITED(wstatus))
ret_val = WEXITSTATUS(wstatus);
}
return ret_val;
}
我已经按照以下方式修改了您的示例代码(大部分更改是在 subprocess.call
的替换中):
import ctypes
import os
import psutil
pid = os.getpid()
this_proc = psutil.Process(pid)
MAX_MEM = int(psutil.virtual_memory().free*1E-9) # in GB
def consume_memory(size):
""" Size in GB """
memory_consumer = []
while get_mem_usage() < size:
memory_consumer.append(" "*1000000) # Adding ~1MB
return(memory_consumer)
def get_mem_usage():
return(this_proc.memory_info()[0]/2.**30)
def get_free_mem():
return(psutil.virtual_memory().free/2.**30)
if __name__ == "__main__":
forker = ctypes.CDLL("forker.so", use_errno=True)
for i in range(1, MAX_MEM):
consumer = consume_memory(i)
mem_usage = get_mem_usage()
print("\n## Memory usage %d/%d GB (%2d%%) ##" % (int(mem_usage),
MAX_MEM, int(mem_usage*100/MAX_MEM)))
try:
cmd = [b"/bin/echo", b"[OK] Fork worked."]
c_cmd = (ctypes.c_char_p * (len(cmd) + 1))()
c_cmd[:] = cmd + [None]
ret = forker.run(c_cmd)
errno = ctypes.get_errno()
if errno:
raise OSError(errno, os.strerror(errno))
except OSError as e:
print("[ERROR] Fork failed. Got OSError.")
print(e)
del consumer
有了它,我仍然可以在报告已满的可用内存的 3/4 处 fork 。
从理论上讲,它都可以“正确”编写,并且可以很好地包装以很好地与 Python 代码集成,但它似乎是一个额外的选择。我仍然会回到执行程序进程。
我只是简单地浏览了 concurrent.futures.process
模块,但是一旦它产生了一个工作进程,它似乎并没有在完成之前破坏它,所以可能滥用现有的 ProcessPoolExecutor
将是一个快速且便宜的选择。我在脚本顶部(主要部分)附近添加了这些:
def nop():
pass
executor = concurrent.futures.ProcessPoolExecutor(max_workers=1)
executor.submit(nop) # start a worker process in the pool
然后将subprocess.call
提交给它:
proc = executor.submit(subprocess.call, ['echo', '[OK] Fork worked.'])
proc.result() # can also collect the return value
关于Python fork : 'Cannot allocate memory' if process consumes more than 50% avail. 内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51045054/
我已经开始研究 MassTransit 并正在编写将处理消息的类。当我从 Consumes 实现接口(interface)时我有四个选项:All , Selected , For和 Context .
我正在尝试找出消费者群体级别是否也有任何抵消。在 Kafka 中,Consumer Offset 是在 Consumer group 级别还是在该 consumer group 内的单个消费者? 最佳
我有一个我不理解的 java 编译器错误。看来消费者 和 Consumer(带有 T 扩展对象)在方法签名参数中不等效。请查看以下代码: import java.util.function.Consu
我在泛型方面遇到了一些麻烦,尽管找到了解决方法,但我不明白是什么阻止了我的代码编译。 我有一个显示 TreeTableView 的 JavaFX 项目:
C++11 标准定义了一个内存模型(1.7、1.10),其中包含内存排序,大致为“顺序一致”、“获取”、“消耗”、“释放”和“放松”。同样粗略地,一个程序只有在它是无种族的情况下才是正确的,如果所有
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。 1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码
我有四个当前消费者在 Amazon AWS 上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志: 18:01:46,515 [jmsContaine
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
我们有一个系统,我们希望将记录(例如联系人、客户、机会)从我们的系统推送到 SalesForce。 为此,我们使用了 ForceToolKit for .Net .我们成功地将联系人记录从我们的系统推
我怎样才能写一个方法来组合 Stream的 Consumers成单个 Consumer使用 Consumer.andThen(Consumer) ? 我的第一个版本是: Consumer combi
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我正在尝试在我的 scala play 应用程序中创建消费者 secret / key 对,但我似乎无法让它正常工作。我有以下代码 import org.apache.commons.codec.bi
我通过传递用户(消费者)名称使用 .NET 应用程序,我需要从 Salesforce 检索消费者 key 和消费者 key ,我该如何实现。 最佳答案 Consumer Key 和 Consumer
我想设置 至 0 .这似乎是另一个问题 ( JMS queue with multiple consumers ) 的答案,并在此 article 中进行了描述。在第 17.1.1 章中。我使用 JN
I have send message api to my users.When I send to message from my x numbers I need to wait 10-15
我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下- while (true) { ConsumerRecords records
我正在为 iPhone 编写 Twitter/Facebook 应用程序。我有自己的 Apache/PHP 服务器。我只想把Consumer Key放在app里,然后我把Consumer Secret
Spring AMQP:比较多个消费者与每个消费者多个线程的性能 我正处于从 Spring 文档学习 Spring AMQP 的阶段。我不清楚提高异步消息消费率的首选方法:根据 Spring 文档 (
我正在制作一个需要 oAuth 1.0 身份验证的应用程序。我可以访问客户提供的消费者 key 和消费者 secret 。我曾尝试使用 AFNetworking 进行此操作,但效果不佳。有人可以建议我
我是一名优秀的程序员,十分优秀!