gpt4 book ai didi

python - 如何重新连接 asyncio 上的套接字?

转载 作者:太空狗 更新时间:2023-10-29 18:02:59 25 4
gpt4 key购买 nike

我想在 app.py 上使用异步创建两个协议(protocol)(TcpClient 和 UdpServer),其中 TcpClient 将与 server.py 和用作 UDP 服务器的 UdpServer 建立持久连接:

我需要的:
a) 两个协议(protocol)进行通信:互相调用方法。这仅适用于第一次连接。如果 TcpClient 重新连接,它不能再次发送字符串“send to tcp”。来自 UdpServer。我检查了 print(self) 并且 TcpClient 创建了一个新实例,旧实例仍然存在但没有连接,但我不知道如何重构它。我认为我以错误的方式使用了 asyncio。
b) 当TcpClient 断开与server.py 的连接时,等待5s 再尝试重连,以此类推。我尝试使用 asyncio 的 call_later() 来做到这一点,但我认为有一种 native 方法可以做到这一点,而不是一种技巧。
c) 当我启动 app.py 时,如果 TcpClient 无法连接,我想在 5 秒后尝试重新连接,依此类推。我不知道该怎么做。

这里是我对 app.py server.py 的示例测试。 server.py 仅用于测试 - 这将是另一种语言。

只是说说我的尝试:
1) 当我启动 app.py 并且 server.py 已关闭时,app.py 不要重试。
2) 当 app.py 连接到 server.py 并且服务器关闭并快速启动时,TcpClient 重新连接,但我无法在新实例上使用更多方法相互连接并发送字符串“发送到 tcp”。到 server.py,只是旧的,没有更多的连接。
3) 如果我使用 asyncio.async() 而不是 run_until_complete(),我将无法调用来自其他协议(protocol)的方法。

我把 app.py 和 server.py 放在这里,所以你可以复制并运行以进行测试。

我使用 ncat localhost 9000 -u -v 发送字符串“send to tcp.”。此字符串需要在 UdpServer 类上打印并传递给 TcpClient 类上的方法 send_data_to_tcp,此方法会将字符串发送到 server.py。 <- 这在第一次重新连接 tcpClient 后不起作用。

我使用的是 Python 3.4.0。

非常感谢。

应用程序.py:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
message = 'Testing'

def connection_made(self, transport):
self.transport = transport
self.transport.write(self.message.encode())
print('data sent: {}'.format(self.message))
server_udp[1].tcp_client_connected()


def data_received(self, data):
self.data = format(data.decode())
print('data received: {}'.format(data.decode()))
if self.data == 'Testing':
server_udp[1].send_data_to_udp(self.data)

def send_data_to_tcp(self, data):
self.transport.write(data.encode())

def connection_lost(self, exc):
msg = 'Connection lost with the server...'
info = self.transport.get_extra_info('peername')
server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

CLIENT_TCP_TIMEOUT = 5.0

def __init__(self):
self.client_tcp_timeout = None

def connection_made(self, transport):
print('start', transport)
self.transport = transport

def datagram_received(self, data, addr):
self.data = data.strip()
self.data = self.data.decode()
print('Data received:', self.data, addr)
if self.data == 'send to tcp.':
client_tcp[1].send_data_to_tcp(self.data)

def connection_lost(self, exc):
print('stop', exc)

def send_data_to_udp(self, data):
print('Receiving on UDPServer Class: ', (data))

def connect_client_tcp(self):
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = loop.run_until_complete(coro)
client_tcp = asyncio.async(coro)

def tcp_client_disconnected(self, data, info):
print(data)
self.client_tcp_info = info
self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

def tcp_client_connected(self):
if self.client_tcp_timeout:
self.client_tcp_timeout.cancel()
print('call_later cancel.')

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
#server_udp = asyncio.Task(coro)
server_udp = loop.run_until_complete(coro)


#TCP client
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = asyncio.async(coro)
client_tcp = loop.run_until_complete(coro)

loop.run_forever()

服务器.py:

import asyncio

class EchoServer(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('connection from {}'.format(peername))
self.transport = transport

def data_received(self, data):
print('data received: {}'.format(data.decode()))
self.transport.write(data)

# close the socket
#self.transport.close()

#def connection_lost(self):
# print('server closed the connection')



loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, 'localhost', 8000)
server = loop.run_until_complete(coro)
print(server)
print(dir(server))
print(dir(server.sockets))

print('serving on {}'.format(server.sockets[0].getsockname()))

try:
loop.run_forever()
except KeyboardInterrupt:
print("exit")
finally:
server.close()
loop.close()

最佳答案

你真的只需要一些小的修复。首先,我编写了一个协程来处理连接重试:

@asyncio.coroutine
def do_connect():
global tcp_server # Make sure we use the global tcp_server
while True:
try:
tcp_server = yield from loop.create_connection(TcpClient,
'localhost', 8000)
except OSError:
print("Server not up retrying in 5 seconds...")
yield from asyncio.sleep(5)
else:
break

然后我们用它来启动一切:

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()

下一部分是在 app.py 启动后处理服务器关闭/恢复。我们需要修复 tcp_client_disconnectedconnect_client_tcp 以正确处理:

def connect_client_tcp(self):
global client_tcp
task = asyncio.async(do_connect())
def cb(result):
client_tcp = result
task.add_done_callback(cb)

def tcp_client_disconnected(self, data, info):
print(data)
self.client_tcp_info = info
self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

有趣的部分是connect_client_tcp。您的原始版本有两个问题:

  1. 您将 client_tcp 直接分配给 asyncio.async(coro) 的结果,这意味着 client_tcp 分配给了一个asyncio.Task。那不是你想要的;您希望将 client_tcp 分配给已完成的 asyncio.Task 的结果。我们通过使用 task.add_done_callbackclient_tcp 分配给 Task 完成后的结果来实现这一点。

  2. 您忘记了方法顶部的 global client_tcp。否则,您只是在创建一个名为 client_tcp 的局部变量,它在 connect_client_tcp 结束时被丢弃。

一旦这些问题得到解决,我就可以运行 app.py,随时启动/停止 serv.py,但始终能看到所有消息正确传递当所有三个组件一起运行时,从 ncatserv.py

这是完整的 app.py,便于复制/粘贴:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
message = 'Testing'

def connection_made(self, transport):
self.transport = transport
self.transport.write(self.message.encode())
print('data sent: {}'.format(self.message))
server_udp[1].tcp_client_connected()


def data_received(self, data):
self.data = format(data.decode())
print('data received: {}'.format(data.decode()))
if self.data == 'Testing':
server_udp[1].send_data_to_udp(self.data)

def send_data_to_tcp(self, data):
self.transport.write(data.encode())

def connection_lost(self, exc):
msg = 'Connection lost with the server...'
info = self.transport.get_extra_info('peername')
server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

CLIENT_TCP_TIMEOUT = 5.0

def __init__(self):
self.client_tcp_timeout = None

def connection_made(self, transport):
print('start', transport)
self.transport = transport

def datagram_received(self, data, addr):
self.data = data.strip()
self.data = self.data.decode()
print('Data received:', self.data, addr)
if self.data == 'send to tcp.':
client_tcp[1].send_data_to_tcp(self.data)

def connection_lost(self, exc):
print('stop', exc)

def send_data_to_udp(self, data):
print('Receiving on UDPServer Class: ', (data))

def connect_client_tcp(self):
global client_tcp
coro = loop.create_connection(TcpClient, 'localhost', 8000)
task = asyncio.async(do_connect())
def cb(result):
client_tcp = result
task.add_done_callback(cb)

def tcp_client_disconnected(self, data, info):
print(data)
self.client_tcp_info = info
self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

def tcp_client_connected(self):
if self.client_tcp_timeout:
self.client_tcp_timeout.cancel()
print('call_later cancel.')

@asyncio.coroutine
def do_connect():
global client_tcp
while True:
try:
client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000)
except OSError:
print("Server not up retrying in 5 seconds...")
yield from asyncio.sleep(1)
else:
break

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()

关于python - 如何重新连接 asyncio 上的套接字?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25998394/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com