gpt4 book ai didi

Threading memory leaks even closing the thread(线程化内存甚至会泄漏关闭线程)

转载 作者:bug小助手 更新时间:2023-10-25 11:57:02 25 4
gpt4 key购买 nike



My last question was about the same program I'm doing, in this case, the problem is that I use threading to do a ping in some devices conected in LAN.
The program should use less than 80 MB but after 5 hours the usage is more than 3 GB, I was searching in python Bug tracker but all the information and that's a coommon report of users, but the solutions that gives me doesn't fell very confident.
Even I tried to get the location data of threads to delete, but I didn't find any option to do it, I supose that is because pyhton you can't access.

我的最后一个问题是关于我正在做的同一个程序,在这种情况下,问题是我使用线程在局域网连接的一些设备上执行ping操作。程序应该使用不到80MB,但5小时后使用量超过3 GB,我在Python Bug Tracker中搜索,但所有信息和用户报告都很常见,但给我的解决方案感觉不是很有信心。我甚至试图获取要删除的线程的位置数据,但我没有找到任何这样做的选项,我想这是因为你无法访问PYHTON。


This is the code:

代码是这样的:


def update_ping(self, data, ping_queue):
result_queue = queue.Queue()
local = threading.local()
while not self.stop_threads:
threads = []
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
thread = threading.Thread(target=self.analyze_device, args=(ip, result_queue), daemon=False)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
new_data = []
print(f"Tamaño cola:{result_queue.qsize()}")
while not result_queue.empty():
ip, status_color = result_queue.get()
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
if equipo["ip"] == ip:
equipo["status_color"] = status_color
for seccion in data:
nuevos_equipos_seccion = []
lineas = seccion["lineas"]
for linea in lineas:
nuevos_equipos_linea = []
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
status_color = equipo["status_color"]
nuevos_equipos_linea.append(equipo)
linea["equipos"] = nuevos_equipos_linea
nuevos_equipos_seccion.append(linea)
seccion["lineas"] = nuevos_equipos_seccion
new_data.append(seccion)
ping_queue.put(new_data)
print("Ping realizado")
print(f"Recolección: {gc.collect()}")
print(f"Depuradores: {gc.get_debug()}")
print(f"Basura: {gc.garbage}")


def analyze_device(self, ip, result_queue):
try:
if self.check_online(ip):
status_color = "green"
else:
status_color = "red"

result_queue.put((ip, status_color))
return
except Exception as e:
print(f"Error en el hilo {ip}: {str(e)}")

def check_online(self, ip):
check = False
for _ in range(5):
data = ping(ip)
if isinstance(data, float):
check = True
break
else:
time.sleep(0.5)
return check

Know how can make the program delete the garbage data or detect for release some memory. Or an alternative form that use paralelism without this problem about memory, the paralelism is necessary because the function verfy the lan connection about 200 devices

知道如何可以使程序删除垃圾数据或检测释放一些内存。或者是一种替代形式,使用mielism没有这个问题的内存,mielism是必要的,因为该功能验证局域网连接约200设备


更多回答

Not an answer, but what is the purpose of the time.sleep(0.5) call in check_online? it causes the thread to live for an extra half second if the given ip does not respond to a ping, but otherwise it does nothing. In particular, it does not slow down the rate at which update_ping function creates other threads to ping other ips.

没有答案,但时间的用途是什么?调用check_Online中的睡眠(0.5)?如果给定的IP不响应ping,它会使线程多活半秒,否则不会执行任何操作。特别是,它不会减慢UPDATE_PING函数创建其他线程来ping其他IP的速度。

Yes you are right, I put it because I was testing, there are not reason keep that line

是的,你说得对,我这么说是因为我在测试,没有理由保持这条线

优秀答案推荐

Q: How many threads does a typical call to update_ping(...) create? and

问:一个典型的UPDATE_PING(...)调用有多少个线程创造?和


Q: What does "the usage" mean?

问:“用法”是什么意思?


Your program creates a new thread for every analyze_device(...) call, and it potentially creates a huge number of those threads all at once. Each of those threads needs a significant chunk of virtual memory for its stack, and even if the program doesn't need a huge amount of real memory, it might never give its virtual allocation back to the OS.

您的程序为每个Analyze_Device(...)创建一个新线程调用,它可能会一次创建大量这样的线程。这些线程中的每一个都需要大量的虚拟内存用于堆栈,即使程序不需要大量的实际内存,它也可能永远不会将其虚拟分配给操作系统。


Three gigabytes of virtual memory is not, in and of itself, a problem—especially not if the virtual memory is mostly unused—but a better solution might be to make the analyze_device(...) calls through a thread pool that uses only some controlled number of worker threads to do the work.

3 GB的虚拟内存本身并不是问题--特别是当虚拟内存大部分未使用时--但更好的解决方案可能是将Analyze_Device(...)通过仅使用一定数量的工作线程来执行工作的线程池进行调用。


See also, https://en.wikipedia.org/wiki/Thread_pool

另请参阅https://en.wikipedia.org/wiki/Thread_pool



Working in the program I realise that you can use a subprocess to do the threading task and return the data to the main program, doing that, when the second task finishes all the memory that you use will released even the memory. In my case, I used this function in the main code:

在程序中工作,我意识到你可以使用子进程来完成线程任务,并将数据返回到主程序,这样做,当第二个任务完成所有你使用的内存时,甚至会释放内存。在我的例子中,我在主代码中使用了以下函数:


data_str = subprocess.check_output(args, universal_newlines=True)

And to get the data in the second program:

要获取第二个程序中的数据:


recived_data = sys.argv[1:]

In this using libraries subprocess and sys.

在此使用库子进程和sys。



You really should be using a thread pool there, instead of creating a thread for each device. Even creating one thread for each device, an dkeeping it alive, if you have 200, should be fine.

您真的应该在那里使用线程池,而不是为每个设备创建一个线程。即使为每个设备创建一个线程,如果您有200个线程,让它保持活动状态也应该很好。


The problem is that you re-create new threads in a massive scale, at each iteration. Using subprocessing instead of threading there may have resolved for you, but just because the OS cleans up after each subprocess - however, the overhead for each "ping" is enormous, as a subprocess is "massive" in terms of system resources used, as compared to threads.

问题是,在每次迭代中,您都会大规模地重新创建新的线程。使用子进程而不是线程可能已经为您解决了问题,但仅仅是因为操作系统在每个子进程之后都会清理-然而,每个“ping”的开销是巨大的,因为与线程相比,子进程使用的系统资源是“巨大的”。


Using concurrent.futures.ThreadPoolExecutor will create an easy to use Pool, and you can easily put in 100 or even 300 workers so that all your stuff is done in parallel (the default is 2 X the number of CPU cores):

使用concurent.futures.ThreadPoolExecutor将创建一个易于使用的池,您可以轻松地放入100个甚至300个工作者,这样您的所有工作都可以并行完成(默认情况下是CPU核心数量的2倍):


from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
import threading

...

def update_ping(self, data, ping_queue):
result_queue = queue.Queue()
with ThreadPoolExecutor(300) as executor:
while not self.stop_threads:
futures = []
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
futures.append(executor.submit(self.analyze_device, ip, result_queue))

for future in as_completed(futures):
# This for is just to emulate your previous code: ensuring
# all tasks are executed before proceeding to next batch
# you can do away with it, if it is not needed.
# also, you could fetch return values from the target function
# here, and not need the queue.
pass
new_data = []
print(f"Tamaño cola:{result_queue.qsize()}")
while not result_queue.empty():
ip, status_color = result_queue.get()
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
if equipo["ip"] == ip:
equipo["status_color"] = status_color
for seccion in data:
nuevos_equipos_seccion = []
lineas = seccion["lineas"]
for linea in lineas:
nuevos_equipos_linea = []
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
status_color = equipo["status_color"]
nuevos_equipos_linea.append(equipo)
linea["equipos"] = nuevos_equipos_linea
nuevos_equipos_seccion.append(linea)
seccion["lineas"] = nuevos_equipos_seccion
new_data.append(seccion)
ping_queue.put(new_data)
print("Ping realizado")
print(f"Recolección: {gc.collect()}")
print(f"Depuradores: {gc.get_debug()}")
print(f"Basura: {gc.garbage}")


...




Note that this removes any need to start a new thread, and remove it for each device - just the function call remains, which is wrapped in a "Future".

请注意,这消除了启动新线程并为每个设备删除它的任何需要--只有函数调用保留下来,它被包装在“Future”中。


There is a chance the passing of the Queue to thousands of threads to be what was leaking in your original implementation - and a smaller chance of it still leak here, though I doubt it . In any case, check the docs for concurrent.futures - https://docs.python.org/3/library/concurrent.futures.html

将队列传递给数千个线程的过程有可能是您的原始实现中的泄漏--在这里仍然泄漏的可能性较小,尽管我对此表示怀疑。无论如何,请检查文档是否具有并发性。Futures-https://docs.python.org/3/library/concurrent.futures.html


更多回答

Yes, but the purpose of these program is to monitoring 24/7, when the treads are active are nearly 192, when are working are using 15MB of memory RAM more than the program uses that when the threads are not active, but when they finish the memory that use should come back to normal levels, releasing the memory used by threads, but doesn't happen, because for some reason the program don't detect all the data used or don't detect that is a old data that shoud be removed, that makes that Ram usage increases 4-5 MB per cicle, if the program have to be monitoring 24/7, will use the entire RAM.

是,但这些程序的目的是全天候监控,当踏步活动时接近192,当工作时使用的内存RAM比程序使用的多15MB,当线程不活动时,但当他们完成内存使用时,应该会回到正常水平,释放线程使用的内存,但不会发生,因为出于某种原因,程序没有检测到所有使用的数据或没有检测到应该删除的旧数据,这使得RAM使用量增加了4-5MB,如果程序必须全天候监控,将使用整个内存。

Also I check that these could hapen with other libraries but i will try to aplicate the pool.

此外,我还检查了这些库是否可以与其他库结合,但我会尝试应用池。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com