gpt4 book ai didi

python - Kombu以非阻塞方式

转载 作者:太空狗 更新时间:2023-10-29 23:59:43 98 4
gpt4 key购买 nike

我正在寻找一种使用 kombu 作为 tornado-sockjs 和 Django 应用程序服务器之间的 MQ 适配器的方法。我做了类似的事情:

class BrokerClient(ConsumerMixin):
clients = []

def __init__(self):
self.connection = BrokerConnection(settings.BROKER_URL)
self.io_loop = ioloop.IOLoop.instance()
self.queue = sockjs_queue
self._handle_loop()

@staticmethod
def instance():
if not hasattr(BrokerClient, '_instance'):
BrokerClient._instance = BrokerClient()
return BrokerClient._instance

def add_client(self, client):
self.clients.append(client)

def remove_client(self, client):
self.clients.remove(client)

def _handle_loop(self):
try:
if self.restart_limit.can_consume(1):
for _ in self.consume(limit=5):
pass
except self.connection.connection_errors:
print ('Connection to broker lost. '
'Trying to re-establish the connection...')
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)

def get_consumers(self, Consumer, channel):
return [Consumer([self.queue, ], callbacks=[self.process_task])]

def process_task(self, body, message):
for client in self.clients:
if hasattr(body, 'users') and client.user.pk in body.users:
client.send(body)
message.ack()

但是 tornado 在 _handle_loop 执行时阻塞了(正如预期的那样)。

有什么办法可以避免这种情况吗?

我知道 Tornado 的 Pika 库适配器,但我想使用 kombu,因为它已经在项目中使用并且具有灵活的传输。

更新:

将 _handle_loop 更改为生成器函数

def drain_events(self, callback):
with self.Consumer() as (connection, channel, consumers):
with self.extra_context(connection, channel):
try:
connection.drain_events(timeout=1)
except:
pass
callback(None)


@tornado.gen.engine
def _handle_loop(self):
response = yield tornado.gen.Task(self.drain_events)
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)

最佳答案

最后我找到了 RabbitMQ 后端的正确解决方案:

class BrokerClient(object):
clients = []

@staticmethod
def instance():
if not hasattr(BrokerClient, '_instance'):
BrokerClient._instance = BrokerClient()
return BrokerClient._instance

def __init__(self):
self.connection = BrokerConnection(settings.BROKER_URL)
self.consumer = Consumer(self.connection.channel(), [queue, ], callbacks=[self.process_task])
self.consumer.consume()
io_loop = tornado.ioloop.IOLoop.instance()
for sock, handler in self.connection.eventmap.items():
def drain_nowait(fd, events):
handler()
io_loop.add_handler(sock.fileno(), drain_nowait, l.READ | l.ERROR)

def process_task(self, body, message):
#something
message.ack()

def add_client(self, client):
self.clients.append(client)

def remove_client(self, client):
self.clients.remove(client)

对于其他后端,你可以使用问题中发布的解决方案

注意:不适用于 librabbitmq

关于python - Kombu以非阻塞方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15746786/

98 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com