gpt4 book ai didi

Threading memory leaks even closing the thread(线程化内存甚至会泄漏关闭线程)

转载 作者:bug小助手 更新时间:2023-10-25 11:57:02 25 4
gpt4 key购买 nike

My last question was about the same program I'm doing, in this case, the problem is that I use threading to do a ping in some devices conected in LAN.
The program should use less than 80 MB but after 5 hours the usage is more than 3 GB, I was searching in python Bug tracker but all the information and that's a coommon report of users, but the solutions that gives me doesn't fell very confident.
Even I tried to get the location data of threads to delete, but I didn't find any option to do it, I supose that is because pyhton you can't access.

我的最后一个问题是关于我正在做的同一个程序,在这种情况下,问题是我使用线程在局域网连接的一些设备上执行ping操作。程序应该使用不到80MB,但5小时后使用量超过3 GB,我在Python Bug Tracker中搜索,但所有信息和用户报告都很常见,但给我的解决方案感觉不是很有信心。我甚至试图获取要删除的线程的位置数据,但我没有找到任何这样做的选项,我想这是因为你无法访问PYHTON。

This is the code:


def update_ping(self, data, ping_queue):
result_queue = queue.Queue()
local = threading.local()
while not self.stop_threads:
threads = []
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
thread = threading.Thread(target=self.analyze_device, args=(ip, result_queue), daemon=False)
for thread in threads:
new_data = []
print(f"Tamaño cola:{result_queue.qsize()}")
while not result_queue.empty():
ip, status_color = result_queue.get()
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
if equipo["ip"] == ip:
equipo["status_color"] = status_color
for seccion in data:
nuevos_equipos_seccion = []
lineas = seccion["lineas"]
for linea in lineas:
nuevos_equipos_linea = []
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
status_color = equipo["status_color"]
linea["equipos"] = nuevos_equipos_linea
seccion["lineas"] = nuevos_equipos_seccion
print("Ping realizado")
print(f"Recolección: {gc.collect()}")
print(f"Depuradores: {gc.get_debug()}")
print(f"Basura: {gc.garbage}")

def analyze_device(self, ip, result_queue):
if self.check_online(ip):
status_color = "green"
status_color = "red"

result_queue.put((ip, status_color))
except Exception as e:
print(f"Error en el hilo {ip}: {str(e)}")

def check_online(self, ip):
check = False
for _ in range(5):
data = ping(ip)
if isinstance(data, float):
check = True
return check

Know how can make the program delete the garbage data or detect for release some memory. Or an alternative form that use paralelism without this problem about memory, the paralelism is necessary because the function verfy the lan connection about 200 devices



Not an answer, but what is the purpose of the time.sleep(0.5) call in check_online? it causes the thread to live for an extra half second if the given ip does not respond to a ping, but otherwise it does nothing. In particular, it does not slow down the rate at which update_ping function creates other threads to ping other ips.


Yes you are right, I put it because I was testing, there are not reason keep that line



Q: How many threads does a typical call to update_ping(...) create? and


Q: What does "the usage" mean?


Your program creates a new thread for every analyze_device(...) call, and it potentially creates a huge number of those threads all at once. Each of those threads needs a significant chunk of virtual memory for its stack, and even if the program doesn't need a huge amount of real memory, it might never give its virtual allocation back to the OS.


Three gigabytes of virtual memory is not, in and of itself, a problem—especially not if the virtual memory is mostly unused—but a better solution might be to make the analyze_device(...) calls through a thread pool that uses only some controlled number of worker threads to do the work.

3 GB的虚拟内存本身并不是问题--特别是当虚拟内存大部分未使用时--但更好的解决方案可能是将Analyze_Device(...)通过仅使用一定数量的工作线程来执行工作的线程池进行调用。

See also,


Working in the program I realise that you can use a subprocess to do the threading task and return the data to the main program, doing that, when the second task finishes all the memory that you use will released even the memory. In my case, I used this function in the main code:


data_str = subprocess.check_output(args, universal_newlines=True)

And to get the data in the second program:


recived_data = sys.argv[1:]

In this using libraries subprocess and sys.


You really should be using a thread pool there, instead of creating a thread for each device. Even creating one thread for each device, an dkeeping it alive, if you have 200, should be fine.


The problem is that you re-create new threads in a massive scale, at each iteration. Using subprocessing instead of threading there may have resolved for you, but just because the OS cleans up after each subprocess - however, the overhead for each "ping" is enormous, as a subprocess is "massive" in terms of system resources used, as compared to threads.


Using concurrent.futures.ThreadPoolExecutor will create an easy to use Pool, and you can easily put in 100 or even 300 workers so that all your stuff is done in parallel (the default is 2 X the number of CPU cores):


from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
import threading


def update_ping(self, data, ping_queue):
result_queue = queue.Queue()
with ThreadPoolExecutor(300) as executor:
while not self.stop_threads:
futures = []
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
futures.append(executor.submit(self.analyze_device, ip, result_queue))

for future in as_completed(futures):
# This for is just to emulate your previous code: ensuring
# all tasks are executed before proceeding to next batch
# you can do away with it, if it is not needed.
# also, you could fetch return values from the target function
# here, and not need the queue.
new_data = []
print(f"Tamaño cola:{result_queue.qsize()}")
while not result_queue.empty():
ip, status_color = result_queue.get()
for seccion in data:
lineas = seccion["lineas"]
for linea in lineas:
equipos = linea["equipos"]
for equipo in equipos:
if equipo["ip"] == ip:
equipo["status_color"] = status_color
for seccion in data:
nuevos_equipos_seccion = []
lineas = seccion["lineas"]
for linea in lineas:
nuevos_equipos_linea = []
equipos = linea["equipos"]
for equipo in equipos:
ip = equipo["ip"]
status_color = equipo["status_color"]
linea["equipos"] = nuevos_equipos_linea
seccion["lineas"] = nuevos_equipos_seccion
print("Ping realizado")
print(f"Recolección: {gc.collect()}")
print(f"Depuradores: {gc.get_debug()}")
print(f"Basura: {gc.garbage}")


Note that this removes any need to start a new thread, and remove it for each device - just the function call remains, which is wrapped in a "Future".


There is a chance the passing of the Queue to thousands of threads to be what was leaking in your original implementation - and a smaller chance of it still leak here, though I doubt it . In any case, check the docs for concurrent.futures -



Yes, but the purpose of these program is to monitoring 24/7, when the treads are active are nearly 192, when are working are using 15MB of memory RAM more than the program uses that when the threads are not active, but when they finish the memory that use should come back to normal levels, releasing the memory used by threads, but doesn't happen, because for some reason the program don't detect all the data used or don't detect that is a old data that shoud be removed, that makes that Ram usage increases 4-5 MB per cicle, if the program have to be monitoring 24/7, will use the entire RAM.


Also I check that these could hapen with other libraries but i will try to aplicate the pool.


25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号