gpt4 book ai didi

python - Twisted SSE 服务器通过 pubsub 订阅 Redis

转载 作者:可可西里 更新时间:2023-11-01 11:38:11 24 4
gpt4 key购买 nike

我正在尝试在 Twisted 中构建一个服务器,它可以让客户端使用服务器发送事件进行连接。我希望该服务器也能监听 Redis,如果有消息出现,则将其推送到连接的 SSE 客户端。

我的 SSE 服务器正在运行。我知道如何订阅 Redis。我无法弄清楚如何让两个部分同时运行而不会相互阻塞。

我知道 https://github.com/leporo/tornado-redishttps://github.com/fiorix/txredisapi ,这是在相关问题中推荐的。不知道这有什么帮助:/

如何解决?您能否在这两个方面提供帮助:概念提示和代码片段?

我的 Twisted SSE 服务器代码:

# coding: utf-8
from twisted.web import server, resource
from twisted.internet import reactor


class Subscribe(resource.Resource):
isLeaf = True
sse_conns = set()

def render_GET(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.write("")
self.add_conn(request)
return server.NOT_DONE_YET

def add_conn(self, conn):
self.sse_conns.add(conn)
finished = conn.notifyFinish()
finished.addBoth(self.rm_conn)

def rm_conn(self, conn):
self.sse_conns.remove(conn)

def broadcast(self, event):
for conn in self.sse_conns:
event_line = "data: {}'\r\n'".format(event)
conn.write(event_line + '\r\n')


if __name__ == "__main__":
sub = Subscribe()
reactor.listenTCP(9000, server.Site(sub))
reactor.run()

我的Redis订阅代码:

import redis


redis = redis.StrictRedis.from_url('redis://localhost:6379')


class RedisSub(object):
def __init__(self):
self.pubsub = redis.pubsub()
self.pubsub.subscribe('foobar-channel')

def listen(self):
for item in self.pubsub.listen():
print str(item)

最佳答案

这对我有用。

我最终使用了 txredis 库,对 RedisClient 稍作改动(添加了最少的订阅功能)。

# coding: utf-8
import os
import sys
import weakref

from txredis.client import RedisClient

from twisted.web import server, resource
from twisted.internet import reactor, protocol, defer
from twisted.python import log

from utils import cors, redis_conf_from_url


log.startLogging(sys.stdout)

PORT = int(os.environ.get('PORT', 9000))
REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379'))
REDIS_SUB_CHANNEL = 'votes'


class RedisBroadcaster(RedisClient):
def subscribe(self, *channels):
self._send('SUBSCRIBE', *channels)

def handleCompleteMultiBulkData(self, reply):
if reply[0] == u"message":
message = reply[1:][1]
self.sse_connector.broadcast(message)
else:
super(RedisClient, self).handleCompleteMultiBulkData(reply)


@defer.inlineCallbacks
def redis_sub():
clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password'))
redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port'])
redis.subscribe(REDIS_SUB_CHANNEL)


class Subscribe(resource.Resource):
isLeaf = True
sse_conns = weakref.WeakSet()

@cors
def render_GET(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.write("")
self.sse_conns.add(request)
return server.NOT_DONE_YET

def broadcast(self, event):
for conn in self.sse_conns:
event_line = "data: {}\r\n".format(event)
conn.write(event_line + '\r\n')


if __name__ == "__main__":
sub = Subscribe()
reactor.listenTCP(PORT, server.Site(sub))

RedisBroadcaster.sse_connector = sub
reactor.callLater(0, redis_sub)

reactor.run()

关于python - Twisted SSE 服务器通过 pubsub 订阅 Redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28529955/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com