gpt4 book ai didi

python多线程抽象编程模型详解

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 28 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章python多线程抽象编程模型详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

最近需要完成一个多线程下载的工具,对其中的多线程下载进行了一个抽象,可以对所有需要使用到多线程编程的地方统一使用这个模型来进行编写.

主要结构:

1、基于Queue标准库实现了一个类似线程池的工具,用户指定提交任务线程submitter与工作线程worker数目,所有线程分别设置为后台运行,提供等待线程运行完成的接口.

2、所有需要完成的任务抽象成task,提供单独的无参数调用方式,供worker线程调用;task以生成器的方式作为参数提供,供submitter调用.

3、所有需要进行线程交互的信息放在context类中.

主要实现代码如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#Submitter线程类实现,主要是`task_generator`调用
class SubmitterThread(threading.Thread):
   _DEFAULT_WAIT_TIMEOUT = 2 #seconds
   def __init__( self , queue, task_gen, timeout = 2 ):
     super (SubmitterThread, self ).__init__()
     self .queue = queue
     if not isinstance (timeout, int ):
       _logger.error( 'Thread wait timeout value error: %s, '
              'use default instead.' % timeout)
       self .timeout = self ._DEFAULT_WAIT_TIMEOUT
     self .timeout = timeout
     self .task_generator = task_gen
 
   def run( self ):
     while True :
       try :
         task = self .task_generator. next ()
         self .queue.put(task, True , self .timeout)
       except Queue.Full:
         _logger.debug( 'Task queue is full. %s wait %d second%s timeout' %
                ( self .name, self .timeout, 's' if ( self .timeout > 1 ) else ''))
         break
       except (StopIteration, ValueError) as e:
         _logger.debug( 'Task finished' )
         break
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#Worker线程实现,主要就是try块内的func调用
class WorkerThread(threading.Thread):
   _DEFAULT_WAIT_TIMEOUT = 2 #seconds
   def __init__( self , queue, timeout = 2 ):
     super (WorkerThread, self ).__init__()
     self .queue = queue
     if not isinstance (timeout, int ):
       _logger.error( 'Thread wait timeout value error: %s, '
              'use default instead.' % timeout)
       self .timeout = self ._DEFAULT_WAIT_TIMEOUT
     self .timeout = timeout
 
   def run( self ):
     while True :
       try :
         func = self .queue.get( True , self .timeout)
       except Queue.Empty:
         _logger.debug( 'Task queue is empty. %s wait %d second%s timeout' %
                ( self .name, self .timeout, 's' if ( self .timeout > 1 ) else ''))
         break
 
       if not callable (func):
         time.sleep( 1 )
       try :
         func()
       except Exception as e:
         _logger.error( 'Thread %s running occurs error: %s' %
                ( self .name, e))
         print ( 'Thread running error: %s' % e)
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Executor( object ):
   """
   The really place to execute executor
   """
   thread_list = []
   submitters = 0
   workers = 0
   queue = None
   task_generator = None
   timeout = 0
   def __init__( self , task_gen, submitters = 1 , workers = 1 , timeout = 2 ):
     if len ( self .thread_list) ! = 0 :
       raise RuntimeError( 'Executor can only instance once.' )
     self .queue = Queue.Queue(maxsize = submitters * 2 + workers * 2 )
     self .submitters = submitters
     self .workers = workers
     self .task_generator = task_gen
     self .timeout = timeout
 
   def start( self ):
     for i in range ( self .submitters):
       submitter = SubmitterThread( self .queue, self .task_generator, self .timeout)
       self .thread_list.append(submitter)
       submitter.setName( 'Submitter-%d' % i)
       submitter.setDaemon( True )
       submitter.start()
     for i in range ( self .workers):
       worker = WorkerThread( self .queue, self .timeout)
       self .thread_list.append(worker)
       worker.setName( 'Worker-%d' % i)
       worker.setDaemon( True )
       worker.start()
 
   def is_alive( self ):
     alive = False
     for t in self .thread_list:
       if t.isAlive():
         alive = True
         break
     return alive
 
   def wait_to_shutdown( self ):
     _logger.debug( 'Start to wait to shutdown' )
     for t in self .thread_list:
       t.join()
       _logger.debug( 'Shutdown thread : %s' % t.name)

Executor类保存了线程池,提供相应接口。有了这个抽象之后,只需要实例化Executor类的对象,然后调用start方法进行多线程任务的运行。并可以用is_alive等接口再主线程内进行其他处理.

后续再使用这个抽象进行实际多线程任务的实现.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.

原文链接:https://blog.csdn.net/u010487568/article/details/52081875 。

最后此篇关于python多线程抽象编程模型详解的文章就讲到这里了,如果你想了解更多关于python多线程抽象编程模型详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com