gpt4 book ai didi

python - 在 python 中使用多线程队列的正确方法?

转载 作者:太空狗 更新时间:2023-10-29 16:58:16 25 4
gpt4 key购买 nike

我正在尝试在 python 中使用多线程的队列。我只是想知道我使用的方法是否正确。如果我正在做一些多余的事情,或者如果有更好的方法我应该使用。

我正在尝试从表中获取新请求并使用一些逻辑来安排它们执行一些操作,例如运行查询。

因此,我从主线程中为队列生成了一个单独的线程。

if __name__=='__main__':

request_queue = SetQueue(maxsize=-1)
worker = Thread(target=request_queue.process_queue)
worker.setDaemon(True)
worker.start()


while True:
try:
#Connect to the database get all the new requests to be verified
db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
#Get new requests for verification
verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS='%s' ORDER BY JOB_ID" %
(username_testschema, 'INITIATED'))

#If there are some requests to be verified, put them in the queue.
if len(verify_these) > 0:
for row in verify_these:
print "verifying : %s" % row[0]
verify_id = row[0]
request_queue.put(verify_id)
except Exception as e:
logger.exception(e)
finally:
time.sleep(10)

现在在 Setqueue 类中,我有一个 process_queue 函数,用于处理每次运行中添加到队列中的前 2 个请求。

'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
def _init(self, maxsize):
Queue.Queue._init(self, maxsize)
self.all_items = set()

def _put(self, item):
if item not in self.all_items:
Queue.Queue._put(self, item)
self.all_items.add(item)

'''
The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
This way max two requests per run will be processed.
'''
def process_queue(self):
while True:
scheduler_obj = Scheduler()

try:
if self.qsize() > 0:
for i in range(2):
job_id = self.get()
t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
t.start()

for i in range(2):
t.join(timeout=1)
self.task_done()

except Exception as e:
logger.exception(
"QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
finally:
time.sleep(10)

我想看看我的理解是否正确,是否有任何问题。

所以在main func 连接数据库时while True 中运行的主线程获取新的请求并将其放入队列中。队列的工作线程(守护进程)不断从队列中获取新请求并派生非守护线程进行处理,并且由于连接超时为 1,工作线程将继续接收新请求而不会被阻塞,并且它的子线程会在后台继续处理。正确吗?

因此,如果主进程退出,这些进程在完成工作之前不会被杀死,但工作守护进程线程将退出。疑问:如果父进程是守护进程而子进程不是守护进程,如果父进程退出,子进程是否退出?)。


我也在这里阅读:- David beazley multiprocessing

david beazley 在使用池作为线程协处理器部分中尝试解决类似的问题。所以我应该按照他的步骤:-1. 创建进程池。2.像我为request_queue做的那样打开一个线程3.在那个线程中

  def process_verification_queue(self):
while True:
try:
if self.qsize() > 0:
job_id = self.get()
pool.apply_async(Scheduler.verify_func, args=(job_id,))
except Exception as e:
logger.exception("QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")

使用池中的进程并并行运行 verify_func。这会给我带来更多性能吗?

最佳答案

虽然可以为队列创建一个新的独立线程,并按照您的方式分别处理该数据,但我相信每个独立工作线程将消息发布到他们已经“知道”的队列中更为常见“关于。然后通过从该队列中拉出消息来从其他线程处理该队列。

设计理念

我设想您的应用程序的方式是三个线程。主线程和两个工作线程。 1 个工作线程将从数据库获取请求并将它们放入队列中。另一个工作线程将处理来自队列的数据

主线程使用线程函数 .join() 等待其他线程完成

您可以保护线程可以访问的队列,并通过使用互斥锁使其线程安全。我也在其他语言的许多其他设计中看到过这种模式。

推荐阅读

Brett Slatkin 的“Effective Python”有一个很好的例子来说明这个问题。

他没有从 Queue 继承,而是在他的类中为它创建了一个包装器调用 MyQueue 并添加一个 get() 和 put(message) 函数。

他甚至在他的 Github 仓库中提供了源代码

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

我不隶属于这本书或其作者,但我强烈推荐这本书,因为我从中学到了很多东西:)

关于python - 在 python 中使用多线程队列的正确方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30309321/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com