gpt4 book ai didi

python - 线程队列在 Python 中挂起

转载 作者:太空宇宙 更新时间:2023-11-04 03:04:45 26 4
gpt4 key购买 nike

我正在尝试通过队列使解析器成为多线程。它似乎有效,但我的队列挂起。如果有人能告诉我如何解决这个问题,我将不胜感激,因为我很少编写多线程代码。

此代码从 Q 中读取:

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread

l = []
q = Queue.Queue()

def parse_record():
d = {}
while not q.empty():
rec = q.get()
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
l.append(d) # l is global

这填补了问题:

def parse_records():
ffile = '/tmp/query.rwf'
flows = SilkFile(ffile, READ)
numthreads = 2

# fill queue
for rec in flows:
q.put(rec)
# work on Queue
for i in range(numthreads):
t = Thread(target = parse_record)
t.daemon = True
t.start()

# blocking
q.join()

# never reached
data_df = pandas.DataFrame.from_records(l)
return data_df

我只在我的 main 中调用 parse_records()。它永远不会终止。

最佳答案

Queue.empty doc说:

...if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

至少您应该使用 get_nowait 否则有数据丢失的风险。但更重要的是,只有当所有排队的项目都被 Queue.task_done 标记为完成时,连接才会释放。调用:

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

作为旁注,l.append(d) 不是原子的,应该用锁保护。

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread, Lock

l = []
l_lock = Lock()
q = Queue.Queue()

def parse_record():
d = {}
while 1:
try:
rec = q.getnowait()
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
with l_lock():
l.append(d) # l is global
q.task_done()
except Queue.Empty:
return

您可以通过使用标准库中的线程池来大大缩短您的代码。

from silk import *
import json
import datetime
import pandas
import multiprocessing.pool

def parse_record(rec):
d = {}
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
return d

def parse_records():
ffile = '/tmp/query.rwf'
flows = SilkFile(ffile, READ)
pool = multiprocessing.pool.Pool(2)
data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows)
pool.close()
return data_df

关于python - 线程队列在 Python 中挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39810172/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com