gpt4 book ai didi

python - 如何关闭python asyncio传输?

转载 作者:行者123 更新时间:2023-12-03 21:35:37 30 4
gpt4 key购买 nike

我正在开发一个使用 python asyncio 套接字服务器的项目。问题是当服务器停止时,服务器的实现不会在传输上调用 .close() 。这似乎使客户端保持连接并导致代码的其他部分崩溃。

Python 文档说传输需要显式关闭,但在这个项目中我不知道在哪里可以关闭它们,因为没有引用为每个客户端创建的传输。
https://docs.python.org/3/library/asyncio-dev.html#close-transports-and-event-loops

这是代码:

"""
Socket server forwarding request to internal server
"""
import logging
try:
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
except ImportError:
import trollius as asyncio


from opcua import ua
from opcua.server.uaprocessor import UaProcessor

logger = logging.getLogger(__name__)


class BinaryServer(object):

def __init__(self, internal_server, hostname, port):
self.logger = logging.getLogger(__name__)
self.hostname = hostname
self.port = port
self.iserver = internal_server
self.loop = internal_server.loop
self._server = None
self._policies = []

def set_policies(self, policies):
self._policies = policies

def start(self):

class OPCUAProtocol(asyncio.Protocol):

"""
instanciated for every connection
defined as internal class since it needs access
to the internal server object
FIXME: find another solution
"""

iserver = self.iserver
loop = self.loop
logger = self.logger
policies = self._policies

def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
self.logger.info('New connection from %s', self.peername)
self.transport = transport
self.processor = UaProcessor(self.iserver, self.transport)
self.processor.set_policies(self.policies)
self.data = b""

def connection_lost(self, ex):
self.logger.info('Lost connection from %s, %s', self.peername, ex)
self.transport.close()
self.processor.close()

def data_received(self, data):
logger.debug("received %s bytes from socket", len(data))
if self.data:
data = self.data + data
self.data = b""
self._process_data(data)

def _process_data(self, data):
buf = ua.utils.Buffer(data)
while True:
try:
backup_buf = buf.copy()
try:
hdr = ua.Header.from_string(buf)
except ua.utils.NotEnoughData:
logger.info("We did not receive enough data from client, waiting for more")
self.data = backup_buf.read(len(backup_buf))
return
if len(buf) < hdr.body_size:
logger.info("We did not receive enough data from client, waiting for more")
self.data = backup_buf.read(len(backup_buf))
return
ret = self.processor.process(hdr, buf)
if not ret:
logger.info("processor returned False, we close connection from %s", self.peername)
self.transport.close()
return
if len(buf) == 0:
return
except Exception:
logger.exception("Exception raised while parsing message from client, closing")
self.transport.close()
break

coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
self._server = self.loop.run_coro_and_wait(coro)
print('Listening on {}:{}'.format(self.hostname, self.port))

def stop(self):
self.logger.info("Closing asyncio socket server")
self.loop.call_soon(self._server.close)
self.loop.run_coro_and_wait(self._server.wait_closed())

正如你所看到的,当我们在这个服务器类上调用 stop() 时,异步服务器调用它的 close 方法。但是,如果客户端已连接,则创建的传输永远不会关闭。

项目仓库在这里 https://github.com/FreeOpcUa/python-opcua/ ,你可以看看Issue 137。

关闭运输对象的正确方法是什么?

最佳答案

我通过应用这种方法解决了这个问题:
#self.OPCUAServer - 这是我的 opcua 服务器
节点 = []
nodes.append(self.OPCUAServer.get_node("ns=0; s=Measurements")) #添加两个根节点
nodes.append(self.OPCUAServer.get_node("ns=1; s=Calibrations")) #到列表
self.OPCUAServer.delete_nodes(nodes, True) # 用这个列表递归调用delete_nodes
self.OPCUAServer.stop()

关于python - 如何关闭python asyncio传输?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35872750/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com