gpt4 book ai didi

Python使用 Beanstalkd 做异步任务处理的方法

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 43 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Python使用 Beanstalkd 做异步任务处理的方法由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

使用 Beanstalkd 作为消息队列服务,然后结合 Python 的装饰器语法实现一个简单的异步任务处理工具. 。

最终效果 。

定义任务

?
1
2
3
4
5
6
7
from xxxxx.job_queue import JobQueue
 
queue = JobQueue()
 
@queue .task( 'task_tube_one' )
def task_one(arg1, arg2, arg3):
  # do task

提交任务

?
1
task_one.put(arg1 = "a" , arg2 = "b" , arg3 = "c" )

然后就可以由后台的 work 线程去执行这些任务了.

实现过程 。

1、了解 Beanstalk Server 。

Beanstalk is a simple, fast work queue. https://github.com/kr/beanstalkd 。

Beanstalk 是一个 C 语言实现的消息队列服务。 它提供了通用的接口,最初设计的目的是通过异步运行耗时的任务来减少大量Web应用程序中的页面延迟。针对不同的语言,有不同的 Beanstalkd Client 实现。 Python 里就有 beanstalkc 等。我就是利用 beanstalkc 来作为与 beanstalkd server 通信的工具.

2、任务异步执行实现原理 。

Python使用 Beanstalkd 做异步任务处理的方法

beanstalkd 只能进行字符串的任务调度。为了让程序支持提交函数和参数,然后由woker执行函数并携带参数。需要一个中间层来将函数与传递的参数注册.

实现主要包括3个部分

Subscriber: 负责将函数注册到 beanstalk 的一个tube上,实现很简单,注册函数名和函数本身的对应关系。(也就意味着同一个分组(tube)下不能有相同函数名存在)。数据存储在类变量里.

?
1
2
3
4
5
6
class Subscriber( object ):
  FUN_MAP = defaultdict( dict )
 
  def __init__( self , func, tube):
   logger.info( 'register func:{} to tube:{}.' . format (func.__name__, tube))
   Subscriber.FUN_MAP[tube][func.__name__] = func

JobQueue: 方便将一个普通函数转换为具有 Putter 能力的装饰器 。

?
1
2
3
4
5
6
7
8
class JobQueue( object ):
  @classmethod
  def task( cls , tube):
   def wrapper(func):
    Subscriber(func, tube)
    return Putter(func, tube)
 
   return wrapper

Putter: 将函数名、函数参数、指定的分组组合为一个对象,然后 json 序列化为字符串,最后通过 beanstalkc 推送到beanstalkd 队列.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Putter( object ):
  def __init__( self , func, tube):
   self .func = func
   self .tube = tube
 
  # 直接调用返回
  def __call__( self , * args, * * kwargs):
   return self .func( * args, * * kwargs)
 
  # 推给离线队列
  def put( self , * * kwargs):
   args = {
    'func_name' : self .func.__name__,
    'tube' : self .tube,
    'kwargs' : kwargs
   }
   logger.info( 'put job:{} to queue' . format (args))
   beanstalk = beanstalkc.Connection(host = BEANSTALK_CONFIG[ 'host' ], port = BEANSTALK_CONFIG[ 'port' ])
   try :
    beanstalk.use( self .tube)
    job_id = beanstalk.put(json.dumps(args))
    return job_id
   finally :
    beanstalk.close()

Worker: 从 beanstalkd 队列中取出字符串,然后通过 json.loads 反序列化为对象,获得 函数名、参数和tube。最后从 Subscriber 中获得 函数名对应的函数代码,然后传递参数执行函数.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Worker( object ):
  worker_id = 0
 
  def __init__( self , tubes):
   self .beanstalk = beanstalkc.Connection(host = BEANSTALK_CONFIG[ 'host' ], port = BEANSTALK_CONFIG[ 'port' ])
   self .tubes = tubes
   self .reserve_timeout = 20
   self .timeout_limit = 1000
   self .kick_period = 600
   self .signal_shutdown = False
   self .release_delay = 0
   self .age = 0
   self .signal_shutdown = False
   signal.signal(signal.SIGTERM, lambda signum, frame: self .graceful_shutdown())
   Worker.worker_id + = 1
   import_module_by_str( 'pear.web.controllers.controller_crawler' )
 
  def subscribe( self ):
   if isinstance ( self .tubes, list ):
    for tube in self .tubes:
     if tube not in Subscriber.FUN_MAP.keys():
      logger.error( 'tube:{} not register!' . format (tube))
      continue
     self .beanstalk.watch(tube)
   else :
    if self .tubes not in Subscriber.FUN_MAP.keys():
     logger.error( 'tube:{} not register!' . format ( self .tubes))
     return
    self .beanstalk.watch( self .tubes)
 
  def run( self ):
   self .subscribe()
   while True :
    if self .signal_shutdown:
     break
    if self .signal_shutdown:
     logger.info( "graceful shutdown" )
     break
    job = self .beanstalk.reserve(timeout = self .reserve_timeout) # 阻塞获取任务,最长等待 timeout
    if not job:
     continue
    try :
     self .on_job(job)
     self .delete_job(job)
    except beanstalkc.CommandFailed as e:
     logger.warning(e, exc_info = 1 )
    except Exception as e:
     logger.error(e)
     kicks = job.stats()[ 'kicks' ]
     if kicks < 3 :
      self .bury_job(job)
     else :
      message = json.loads(job.body)
      logger.error( "Kicks reach max. Delete the job" , extra = { 'body' : message})
      self .delete_job(job)
 
  @classmethod
  def on_job( cls , job):
   start = time.time()
   msg = json.loads(job.body)
   logger.info(msg)
   tube = msg.get( 'tube' )
   func_name = msg.get( 'func_name' )
   try :
    func = Subscriber.FUN_MAP[tube][func_name]
    kwargs = msg.get( 'kwargs' )
    func( * * kwargs)
    logger.info(u '{}-{}' . format (func, kwargs))
   except Exception as e:
    logger.error(e.message, exc_info = True )
   cost = time.time() - start
   logger.info( '{} cost {}s' . format (func_name, cost))
 
  @classmethod
  def delete_job( cls , job):
   try :
    job.delete()
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info = 1 )
 
  @classmethod
  def bury_job( cls , job):
   try :
    job.bury()
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info = 1 )
 
  def graceful_shutdown( self ):
   self .signal_shutdown = True

写上面代码的时候,发现一个问题:

通过 Subscriber 注册函数名和函数本身的对应关系,是在一个Python解释器,也就是在一个进程里运行的,而 Worker 又是异步在另外的进程运行,怎么样才能让 Worker 也能拿到和 Putter 一样的 Subscriber。最后发现通过 Python 的装饰器机制可以解决这个问题.

就是这句解决了 Subscriber 的问题 。

?
1
import_module_by_str( 'pear.web.controllers.controller_crawler' )
?
1
2
3
4
5
# import_module_by_str 的实现
def import_module_by_str(module_name):
  if isinstance (module_name, unicode ):
   module_name = str (module_name)
  __import__ (module_name)

执行 import_module_by_str 时, 会调用 __import__ 动态加载类和函数。将使用了 JobQueue 的函数所在模块加载到内存之后。当 运行 Woker 时,Python 解释器就会先执行 @修饰的装饰器代码,也就会把 Subscriber 中的对应关系加载到内存.

实际使用可以看 https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py 。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.

原文链接:https://www.jianshu.com/p/cc9cd2892ff8 。

最后此篇关于Python使用 Beanstalkd 做异步任务处理的方法的文章就讲到这里了,如果你想了解更多关于Python使用 Beanstalkd 做异步任务处理的方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

43 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com