- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在努力了解 Python 3 asyncio
模块,特别是使用传输/协议(protocol) API。我想创建一个发布/订阅模式,并使用 asyncio.Protocol
类来创建我的客户端和服务器。
目前,我已启动并运行服务器,并监听传入的客户端连接。客户端能够连接到服务器,发送消息并接收回复。
我希望能够使 TCP 连接保持事件状态并维护一个允许我添加消息的队列。我试图找到一种使用低级 API(传输/协议(protocol))来执行此操作的方法,但有限的在线 asyncio 文档/示例似乎都进入了高级 API - 使用流等。有人能够为我指明正确的实现方向?
这是服务器代码:
#!/usr/bin/env python3
import asyncio
import json
class SubscriberServerProtocol(asyncio.Protocol):
""" A Server Protocol listening for subscriber messages """
def connection_made(self, transport):
""" Called when connection is initiated """
self.peername = transport.get_extra_info('peername')
print('connection from {}'.format(self.peername))
self.transport = transport
def data_received(self, data):
""" The protocol expects a json message containing
the following fields:
type: subscribe/unsubscribe
channel: the name of the channel
Upon receiving a valid message the protocol registers
the client with the pubsub hub. When succesfully registered
we return the following json message:
type: subscribe/unsubscribe/unknown
channel: The channel the subscriber registered to
channel_count: the amount of channels registered
"""
# Receive a message and decode the json output
recv_message = json.loads(data.decode())
# Check the message type and subscribe/unsubscribe
# to the channel. If the action was succesful inform
# the client.
if recv_message['type'] == 'subscribe':
print('Client {} subscribed to {}'.format(self.peername,
recv_message['channel']))
send_message = json.dumps({'type': 'subscribe',
'channel': recv_message['channel'],
'channel_count': 10},
separators=(',', ':'))
elif recv_message['type'] == 'unsubscribe':
print('Client {} unsubscribed from {}'
.format(self.peername, recv_message['channel']))
send_message = json.dumps({'type': 'unsubscribe',
'channel': recv_message['channel'],
'channel_count': 9},
separators=(',', ':'))
else:
print('Invalid message type {}'.format(recv_message['type']))
send_message = json.dumps({'type': 'unknown_type'},
separators=(',', ':'))
print('Sending {!r}'.format(send_message))
self.transport.write(send_message.encode())
def eof_received(self):
""" an EOF has been received from the client.
This indicates the client has gracefully exited
the connection. Inform the pubsub hub that the
subscriber is gone
"""
print('Client {} closed connection'.format(self.peername))
self.transport.close()
def connection_lost(self, exc):
""" A transport error or EOF is seen which
means the client is disconnected.
Inform the pubsub hub that the subscriber has
Disappeared
"""
if exc:
print('{} {}'.format(exc, self.peername))
loop = asyncio.get_event_loop()
# Each client will create a new protocol instance
coro = loop.create_server(SubscriberServerProtocol, '127.0.0.1', 10666)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
try:
server.close()
loop.until_complete(server.wait_closed())
loop.close()
except:
pass
这是客户端代码:
#!/usr/bin/env python3
import asyncio
import json
class SubscriberClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
""" Upon connection send the message to the
server
A message has to have the following items:
type: subscribe/unsubscribe
channel: the name of the channel
"""
transport.write(self.message.encode())
print('Message sent: {!r}'.format(self.message))
def data_received(self, data):
""" After sending a message we expect a reply
back from the server
The return message consist of three fields:
type: subscribe/unsubscribe
channel: the name of the channel
channel_count: the amount of channels subscribed to
"""
print('Message received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
if __name__ == '__main__':
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SubscriberClientProtocol(message,
loop),
'127.0.0.1', 10666)
loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
print('Closing connection')
loop.close()
最佳答案
您的服务器对于您要执行的操作来说没有问题;您编写的代码实际上使 TCP 连接保持事件状态,只是您没有适当的管道来不断向其提供新消息。为此,您需要调整客户端代码,以便您可以随时向其中提供新消息,而不是仅在 connection_made
回调触发时才这样做。
这很容易;我们将向可以接收消息的 ClientProtocol
添加一个内部 asyncio.Queue
,然后在无限循环中运行协程,该协程使用来自该 Queue 的消息
,并将它们发送到服务器。最后一部分是实际存储从 create_connection
调用返回的 ClientProtocol
实例,然后将其传递给实际发送消息的协程。
import asyncio
import json
class SubscriberClientProtocol(asyncio.Protocol):
def __init__(self, loop):
self.transport = None
self.loop = loop
self.queue = asyncio.Queue()
self._ready = asyncio.Event()
asyncio.async(self._send_messages()) # Or asyncio.ensure_future if using 3.4.3+
@asyncio.coroutine
def _send_messages(self):
""" Send messages to the server as they become available. """
yield from self._ready.wait()
print("Ready!")
while True:
data = yield from self.queue.get()
self.transport.write(data.encode('utf-8'))
print('Message sent: {!r}'.format(message))
def connection_made(self, transport):
""" Upon connection send the message to the
server
A message has to have the following items:
type: subscribe/unsubscribe
channel: the name of the channel
"""
self.transport = transport
print("Connection made.")
self._ready.set()
@asyncio.coroutine
def send_message(self, data):
""" Feed a message to the sender coroutine. """
yield from self.queue.put(data)
def data_received(self, data):
""" After sending a message we expect a reply
back from the server
The return message consist of three fields:
type: subscribe/unsubscribe
channel: the name of the channel
channel_count: the amount of channels subscribed to
"""
print('Message received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
@asyncio.coroutine
def feed_messages(protocol):
""" An example function that sends the same message repeatedly. """
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))
while True:
yield from protocol.send_message(message)
yield from asyncio.sleep(1)
if __name__ == '__main__':
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SubscriberClientProtocol(loop),
'127.0.0.1', 10666)
_, proto = loop.run_until_complete(coro)
asyncio.async(feed_messages(proto)) # Or asyncio.ensure_future if using 3.4.3+
try:
loop.run_forever()
except KeyboardInterrupt:
print('Closing connection')
loop.close()
关于python - 使用队列的 Asyncio 持久客户端协议(protocol)类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30937042/
我遇到一种情况,我需要从某个主题读取(正在进行的)消息并将它们放入另一个 Queue 中。我怀疑我是否需要 jms Queue 或者我可以对内存中的 java Queue 感到满意。我将通过同一 jv
队列也是一种操作受限的线性数据结构,与栈很相似。 01、定义 栈的操作受限表现为只允许在队列的一端进行元素插入操作,在队列的另一端只允许删除操作。这一特性可以总结为先进先出(First In
队列的定义 队列(Queue):先进先出的线性表 队列是仅在队尾进行插入和队头进行删除操作的线性表 队头(front):线性表的表头端,即可删除端 队尾(rear):线性表的表尾端,即可插入端 由于这
Redis专题-队列 首先,想一想 Redis 适合做消息队列吗? 1、消息队列的消息存取需求是什么?redis中的解决方案是什么? 无非就是下面这几点: 0、数据可以顺序读
0. 学习目标 栈和队列是在程序设计中常见的数据类型,从数据结构的角度来讲,栈和队列也是线性表,是操作受限的线性表,它们的基本操作是线性表操作的子集,但从数据类型的角度来讲,它们与线性表又有着巨大的不
我想在 redis + Flask 和 Python 中实现一个队列。我已经用 RQ 实现了这样的查询,如果你有 Flask 应用程序和任务在同一台服务器上工作,它就可以正常工作。我想知道是否有可能创
我正在使用 Laravel 5.1,我有一个大约需要 2 分钟来处理的任务,这个任务特别是生成报告...... 现在,很明显,我不能让用户在我接受用户输入的同一页面上等待 2 分钟,而是我应该在后台处
我正在使用 Azure 队列,并且有多个不同的进程从队列中读取数据。 我的系统的构建方式假设每条消息只读取一次。 这个Microsoft article声称 Azure 队列具有至少一次传送保证,这可
我正在创建一个Thread::Queue元素数组。 我这样做是这样的: for (my $i=0; $i new; } 但是,当我在每个队列中填充这样的元素时 $queues[$index]->enq
我试图了解如何将我的 Mercurial 补丁推送到远程存储库(例如 bitbucket.org),而不必先应用它们(实际上提交它们)。我的动机是在最终完成之前首先对我的工作进行远程备份,并且能够与其
我的本地计算机上有一个 Mercurial 队列补丁,我需要与同事共享该补丁,但我不想将其提交到上游存储库。有没有一种简单的方法可以打包该补丁并与他分享? 最佳答案 mq 将补丁作为不带扩展名的文
Java 中是否有任何类提供与 Queue 相同的功能,但有返回对象的选项,并且不要删除它,只需将其设置在集合末尾? 最佳答案 Queue不直接提供这样的方法。但是,您可以使用 poll 和 add
我在Windows上使用Tortoise svn客户端,我需要能够一次提交来自不同子文件夹的更改文件-一次提交。像在提交之前将文件添加到队列中之类的?我该怎么做? Windows上是否还有另一个svn
好吧,我正在尝试对我的 DSAQueue 类进行单元测试,它显示我的 isEmpty()、isFull() 和 dequeue() 方法失败。 以下是我的 DSAQueue 代码。我认为我的 Dequ
我想尽量减少对传入请求的数据库查询。它目前需要写入 6 个不同的表。在返回响应之前不需要完成处理。因此,我考虑了 laravel 队列,但我想知道我是否也可以摆脱写入队列/作业表所需的单独查询。我可以
我正在学习队列数据结构。我想用链表创建队列。我想编程输出:10 20程序输出:队列为空-1 队列为空-1 我哪里出错了? 代码如下: class Node { int x; Node next
“当工作人员有空时,他们会根据主题的优先级列表从等待请求池中进行选择。在时间 t 到达的所有请求都可以在时间 t 进行分配。如果两名工作人员同时有空,则安排优先权分配给最近的工作最早安排的人。如果仍然
我正在开发一个巨大的应用程序,它使用一些子菜单、模式窗口、提示等。 现在,我想知道在此类应用程序中处理 Esc 和单击外部事件的正确方法。 $(document).keyup(function(e)
所以 如果我有一个队列 a --> b --> NULL; 当我使用函数时 void duplicate(QueueNodePtr pHead, QueueNodePtr *pTail) 它会给 a
我正在尝试为键盘输入实现 FIFO 队列,但似乎无法让它工作。我可以让键盘输入显示在液晶显示屏上,但这就是我能做的。我认为代码应该读取键盘输入并将其插入队列,然后弹出键盘输入并将值读取到液晶屏幕上。有
我是一名优秀的程序员,十分优秀!