gpt4 book ai didi

python - Tornado -redis : RPOP works but BRPOP doesn't?

转载 作者:IT王子 更新时间:2023-10-29 06:10:21 24 4
gpt4 key购买 nike

Tornado 和 Redis 的新手,并开始实现监听器/工作器设置。

我希望能够将任务 LPUSH 到队列中,然后将它们关闭。 BRPOP 似乎是弹出它们的最佳方式,因为如果当前没有,它将等待添加一个。问题是,无论何时我使用它,它都不会返回...但是当我使用 RPOP 时,我会按预期获得队列中的下一个项目。

class ListenHandler(tornado.websocket.WebSocketHandler):
uid = ''
CHANNEL_TPL = "client_%s"
RESPONSE_TPL = '{"command":"%s","rid":"%s","status":"%s","result":%s}'
def open(self):
# new websocket connection is established from a client
print "open socket"
self.uid = session = uuid4()

def on_message(self, message):
print "on_message called [%s]" % message
try:
m = json.loads(message)
except ValueError:
self.write_message('BAD')
return

# check for RID (request id)
if not 'rid' in m:
self.write_message('error: unspecified rid')
return

# confirm receipt of data
confirm_string = '%s OK' % (m['rid'])
self.write_message(confirm_string)

# check for command
if not 'command' in m:
response = '%s error: unspecified command' % (m['rid'])
self.write_message(response)
return

# process commands
if m['command'] == 'register':
self._register(m['rid'])
elif m['command'] == 'get_canvas':
self._queue_command('read', m)
elif m['command'] == 'save_canvas':
if 'data' in m:
self._queue_command('write', m)
else:
response = '%s unspecified data' % (m['rid'])
self.write_message(response)
return
elif m['command'] == 'list_command_queue':
self._list_command_queue(m['rid'])
elif m['command'] == 'get_read_job':
self._get_read_job(m['rid'])
elif m['command'] == 'get_write_job':
self._get_write_job(m['rid'])
else:
# no commands recognized
response = '%s error: unknown command' % (m['rid'])
self.write_message(response)
return
print "end of on_message()\n"

def callback(self, data):
print "- callback()"
self.write_message(data)

def on_close(self):
# websocket connection is closed by client
pass

def _register(self, rid):
data = '{"uid":"%s"}' % (self.uid)
response = self.RESPONSE_TPL % ('register', rid, 'completed', data)
self.callback(response)

@tornado.web.asynchronous
@tornado.gen.engine
def _queue_command(self, type, m):
channel = self.CHANNEL_TPL % (type)
print "pushing job to %s ... data[%s]" % (channel, m)
yield tornado.gen.Task(self.application.rdb.lpush, channel, m)
return

def _list_command_queue(self, rid):
channel_r = self.CHANNEL_TPL % ('read')
channel_w = self.CHANNEL_TPL % ('write')
data = '{"client_read":"%s","client_write":"%s"}' % (self.application.rdb.llen(channel_r), self.application.rdb.llen(channel_w))
response = self.RESPONSE_TPL % ('list_command_queue', rid, 'completed', data)
print "list_command_queue [%s]" % (response)
self.callback(response)

@tornado.web.asynchronous
@tornado.gen.engine
def _get_read_job(self, rid):
channel = self.CHANNEL_TPL % ('read')
data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
response = self.RESPONSE_TPL % ('get_read_job', rid, 'completed', data)
self.callback(response)

@tornado.web.asynchronous
@tornado.gen.engine
def _get_write_job(self, rid):
channel = self.CHANNEL_TPL % ('write')
data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
response = self.RESPONSE_TPL % ('get_write_job', rid, 'completed', data)
self.callback(response)

上面的类将接受识别的命令并将它们 LPUSH 到两个不同的队列之一(一个“写”队列用于将写入 SQL DB 的作业,一个“读”队列用于只读的作业)。理论上,在不同的机器上会有许多“工作”线程使用 BRPOP 来获取这些命令并执行它们。不过现在,我正在使用同一个监听器来测试队列中的内容。

函数 get_write_jobs 和 get_read_jobs 将返回队列中的下一个条目,没问题。但是,如果我向任一方法添加“b”(brpop),该函数将永远无法调用回调。似乎只是锁定,等待下一个可用的条目,但那里有条目。

知道这里发生了什么吗?我是否误解了 BRPOP 的目的?

谢谢,尼克

最佳答案

也许有点晚了,但我想我能帮上忙。

我今天遇到了同样的问题,这几乎让我发疯。最后,经过大量调试后,我通过将 key 或 channel 传递给列表(例如 [channel])内的 blpop/brpop 解决了这个问题。

tornado-redis 的 brpop/blpop 内部所做的是将键转换为列表,但是当只接收到一个键时,它会将字符串转换为字符列表(简直太神奇了......),这就是为什么之后的调用 block ,它正在等待各种列表中的新项目,其名称对应于原始键的所有字符。

关于python - Tornado -redis : RPOP works but BRPOP doesn't?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11243862/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com