gpt4 book ai didi

python - 使用队列的 Asyncio 持久客户端协议(protocol)类

转载 作者:太空狗 更新时间:2023-10-29 20:49:36 25 4
gpt4 key购买 nike

我正在努力了解 Python 3 asyncio 模块,特别是使用传输/协议(protocol) API。我想创建一个发布/订阅模式,并使用 asyncio.Protocol 类来创建我的客户端和服务器。

目前,我已启动并运行服务器,并监听传入的客户端连接。客户端能够连接到服务器,发送消息并接收回复。

我希望能够使 TCP 连接保持事件状态并维护一个允许我添加消息的队列。我试图找到一种使用低级 API(传输/协议(protocol))来执行此操作的方法,但有限的在线 asyncio 文档/示例似乎都进入了高级 API - 使用流等。有人能够为我指明正确的实现方向?

这是服务器代码:

#!/usr/bin/env python3

import asyncio
import json


class SubscriberServerProtocol(asyncio.Protocol):
""" A Server Protocol listening for subscriber messages """

def connection_made(self, transport):
""" Called when connection is initiated """

self.peername = transport.get_extra_info('peername')
print('connection from {}'.format(self.peername))
self.transport = transport

def data_received(self, data):
""" The protocol expects a json message containing
the following fields:

type: subscribe/unsubscribe
channel: the name of the channel

Upon receiving a valid message the protocol registers
the client with the pubsub hub. When succesfully registered
we return the following json message:

type: subscribe/unsubscribe/unknown
channel: The channel the subscriber registered to
channel_count: the amount of channels registered
"""

# Receive a message and decode the json output
recv_message = json.loads(data.decode())

# Check the message type and subscribe/unsubscribe
# to the channel. If the action was succesful inform
# the client.
if recv_message['type'] == 'subscribe':
print('Client {} subscribed to {}'.format(self.peername,
recv_message['channel']))
send_message = json.dumps({'type': 'subscribe',
'channel': recv_message['channel'],
'channel_count': 10},
separators=(',', ':'))
elif recv_message['type'] == 'unsubscribe':
print('Client {} unsubscribed from {}'
.format(self.peername, recv_message['channel']))
send_message = json.dumps({'type': 'unsubscribe',
'channel': recv_message['channel'],
'channel_count': 9},
separators=(',', ':'))
else:
print('Invalid message type {}'.format(recv_message['type']))
send_message = json.dumps({'type': 'unknown_type'},
separators=(',', ':'))

print('Sending {!r}'.format(send_message))
self.transport.write(send_message.encode())

def eof_received(self):
""" an EOF has been received from the client.

This indicates the client has gracefully exited
the connection. Inform the pubsub hub that the
subscriber is gone
"""
print('Client {} closed connection'.format(self.peername))
self.transport.close()

def connection_lost(self, exc):
""" A transport error or EOF is seen which
means the client is disconnected.

Inform the pubsub hub that the subscriber has
Disappeared
"""
if exc:
print('{} {}'.format(exc, self.peername))


loop = asyncio.get_event_loop()

# Each client will create a new protocol instance
coro = loop.create_server(SubscriberServerProtocol, '127.0.0.1', 10666)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass

# Close the server
try:
server.close()
loop.until_complete(server.wait_closed())
loop.close()
except:
pass

这是客户端代码:

#!/usr/bin/env python3

import asyncio
import json


class SubscriberClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop

def connection_made(self, transport):
""" Upon connection send the message to the
server

A message has to have the following items:
type: subscribe/unsubscribe
channel: the name of the channel
"""
transport.write(self.message.encode())
print('Message sent: {!r}'.format(self.message))

def data_received(self, data):
""" After sending a message we expect a reply
back from the server

The return message consist of three fields:
type: subscribe/unsubscribe
channel: the name of the channel
channel_count: the amount of channels subscribed to
"""
print('Message received: {!r}'.format(data.decode()))

def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()

if __name__ == '__main__':
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SubscriberClientProtocol(message,
loop),
'127.0.0.1', 10666)
loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
print('Closing connection')
loop.close()

最佳答案

您的服务器对于您要执行的操作来说没有问题;您编写的代码实际上使 TCP 连接保持事件状态,只是您没有适当的管道来不断向其提供新消息。为此,您需要调整客户端代码,以便您可以随时向其中提供新消息,而不是仅在 connection_made 回调触发时才这样做。

这很容易;我们将向可以接收消息的 ClientProtocol 添加一个内部 asyncio.Queue,然后在无限循环中运行协程,该协程使用来自该 Queue 的消息,并将它们发送到服务器。最后一部分是实际存储从 create_connection 调用返回的 ClientProtocol 实例,然后将其传递给实际发送消息的协程。

import asyncio
import json

class SubscriberClientProtocol(asyncio.Protocol):
def __init__(self, loop):
self.transport = None
self.loop = loop
self.queue = asyncio.Queue()
self._ready = asyncio.Event()
asyncio.async(self._send_messages()) # Or asyncio.ensure_future if using 3.4.3+

@asyncio.coroutine
def _send_messages(self):
""" Send messages to the server as they become available. """
yield from self._ready.wait()
print("Ready!")
while True:
data = yield from self.queue.get()
self.transport.write(data.encode('utf-8'))
print('Message sent: {!r}'.format(message))

def connection_made(self, transport):
""" Upon connection send the message to the
server

A message has to have the following items:
type: subscribe/unsubscribe
channel: the name of the channel
"""
self.transport = transport
print("Connection made.")
self._ready.set()

@asyncio.coroutine
def send_message(self, data):
""" Feed a message to the sender coroutine. """
yield from self.queue.put(data)

def data_received(self, data):
""" After sending a message we expect a reply
back from the server

The return message consist of three fields:
type: subscribe/unsubscribe
channel: the name of the channel
channel_count: the amount of channels subscribed to
"""
print('Message received: {!r}'.format(data.decode()))

def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()

@asyncio.coroutine
def feed_messages(protocol):
""" An example function that sends the same message repeatedly. """
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))
while True:
yield from protocol.send_message(message)
yield from asyncio.sleep(1)

if __name__ == '__main__':
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SubscriberClientProtocol(loop),
'127.0.0.1', 10666)
_, proto = loop.run_until_complete(coro)
asyncio.async(feed_messages(proto)) # Or asyncio.ensure_future if using 3.4.3+
try:
loop.run_forever()
except KeyboardInterrupt:
print('Closing connection')
loop.close()

关于python - 使用队列的 Asyncio 持久客户端协议(protocol)类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30937042/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com