gpt4 book ai didi

Python 和 Redis : Manager/Worker application best practices

转载 作者:IT王子 更新时间:2023-10-29 06:12:22 25 4
gpt4 key购买 nike

我有几个关于使用 Python 和 Redis 创建用于运行异步命令的作业队列应用程序的一般性问题。这是我到目前为止生成的代码:

def queueCmd(cmd):
r_server.rpush("cmds", cmd)

def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)

def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1

def boom(info):
print "pop goes the weasel"

if __name__ == '__main__':

r_server = redis.Redis("localhost")

queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")

printCmdQueue()

pool = Pool(processes=2)

print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break

printCmdQueue()

首先,我是否认为如果我需要与经理进行任何沟通,我想通过回调来进行?我在这个使用中看到的快速示例将异步调用存储在结果中并通过 result.get(timeout=1) 访问它。通过通信,我的意思是将内容放回 Redis 列表中。

编辑:如果命令以异步方式运行,并且我在 main 中的结果超时,那是使 worker 超时还是只是在管理器中的那个操作超时?如果经理不能使用它来检查工作人员的退出代码该多好?

接下来,这段代码产生以下输出:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

为什么工作人员想要一次使用多个命令,即使我一次弹出一个命令?类似地,这并不总是很好地结束,有时需要 ctrl+c。为了对付他,我清理了队列,然后再去。我认为这与 apply_sync() 以及是否退出循环有关。我想知道是否需要在工作人员方面进行更多操作?

如果我将 ifs 更改为注释掉的那个,我会得到:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

这似乎是检查我是否需要中断的更好方法,但似乎函数有时会返回字符串文字?

如有任何改进建议,我们将不胜感激。我只是想制作一个类似于 Linux 机器上的服务/守护进程的管理器。它将用于从 redis 列表中获取作业(当前是命令,但可能更多)并将结果返回到 redis 列表中。然后,GUI 将与该管理器交互以获取队列状态并返回结果。

谢谢,

编辑:

我意识到我有点傻了。我不需要从工作人员访问 redis 服务器,这会导致一些错误(特别是 ValueError)。

为了解决这个问题,循环现在是:

while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])

在这些行之后,我调用了 pool.close()。我使用 os.getpid()os.getppid() 来检查我是否确实有多个 child 跑来跑去。

如果这听起来像是创建使用 Redis 的管理器/工作器应用程序的好方法,我仍然会很高兴。

最佳答案

您的问题是您正试图通过单个 redis 连接同时运行多个命令。

你期待的是这样的

Thread 1     Thread 2
LLEN test
1
LPOP test
command
LLEN test
0

但是你得到了

Thread 1     Thread 2
LLEN test
1
LPOP test
LLEN test
command
0

结果以相同的顺序返回,但没有将线程或命令链接到特定结果。单独的 redis 连接不是线程安全的——每个工作线程都需要一个。

如果你不恰本地使用流水线,你也会看到类似的问题 - 它是为只写场景设计的,比如向列表中添加大量项目,你可以通过假设 LPUSH 成功而不是等待服务器告诉你它来提高性能在每个项目之后都成功了。 Redis 仍然会返回结果,但它们不一定是最后发送的命令的结果。

除此之外,基本的做法是合理的。不过,您可以进行一些改进:

  • 不检查长度,只使用非阻塞 LPOP - 如果它返回 null,则列表为空
  • 添加一个计时器,这样如果列表为空,它就会等待,而不是仅仅发出另一个命令。
  • 在 while 循环条件中包含取消检查
  • 处理连接错误——我使用了一个外部循环设置,这样如果连接失败,worker 将在完全终止 worker 进程之前尝试重新连接(基本上是重新启动 ma​​in)进行合理次数的尝试.

关于Python 和 Redis : Manager/Worker application best practices,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6378628/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com