gpt4 book ai didi

python - 如何在 PyQt5 中使用 QWebSocket 创建 websocket 客户端

转载 作者:太空宇宙 更新时间:2023-11-04 08:51:03 26 4
gpt4 key购买 nike

我想在 PyQt5 中使用 QWebSocket 创建一个 websocket 客户端。为方便起见,假设我有一个 websocket 服务器,源代码是这样的,

from PyQt5 import QtCore, QtWebSockets, QtNetwork, QtGui
from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QAction
from PyQt5.QtCore import QUrl

class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress.LocalHost, 1302):
print('Connected: '+self.server.serverName()+' : '
+self.server.serverAddress().toString()+':'+str(self.server.serverPort()))
else:
print('error')
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())

def onNewConnection(self):
self.clientConnection = self.server.nextPendingConnection()
self.clientConnection.textMessageReceived.connect(self.processTextMessage)

self.clientConnection.binaryMessageReceived.connect(self.processBinaryMessage)
self.clientConnection.disconnected.connect(self.socketDisconnected)

print("newClient")
self.clients.append(self.clientConnection)

def processTextMessage(self, message):
print(message)
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)

def processBinaryMessage(self, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)

def socketDisconnected(self):
if self.clientConnection:
self.clients.remove(self.clientConnection)
self.clientConnection.deleteLater()

if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.NonSecureMode)
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
app.exec_()

它可以创建一个 websocket 服务器,我使用 JavaScript 对其进行了测试,它工作正常。但是我可以找到一种使用 Qwebsocket 创建客户端的方法。我的客户端代码是这样的:

client =  QtWebSockets.QWebSocket("",QtWebSockets.QWebSocketProtocol.Version13,None)
client.open(QUrl("ws://127.0.0.1:1302"))
client.sendTextMessage("asd")
client.close()

服务端好像没有收到客户端发送的消息,如何创建websocket客户端并使用Qwebsocket发送消息?

最佳答案

这是 qt 控制台程序的典型问题,您需要在 python 构造函数 (__init__) 之外调用您的客户端方法。

我稍微修改了你的服务器,添加了一些错误测试(没什么新东西):

from PyQt5 import QtCore, QtWebSockets, QtNetwork, QtGui
from PyQt5.QtWidgets import QApplication, QMainWindow, QMenu, QAction
from PyQt5.QtCore import QUrl

class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
print("server name: {}".format(parent.serverName()))
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress.LocalHost, 1302):
print('Listening: {}:{}:{}'.format(
self.server.serverName(), self.server.serverAddress().toString(),
str(self.server.serverPort())))
else:
print('error')
self.server.acceptError.connect(self.onAcceptError)
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())

def onAcceptError(accept_error):
print("Accept Error: {}".format(accept_error))

def onNewConnection(self):
print("onNewConnection")
self.clientConnection = self.server.nextPendingConnection()
self.clientConnection.textMessageReceived.connect(self.processTextMessage)

self.clientConnection.textFrameReceived.connect(self.processTextFrame)

self.clientConnection.binaryMessageReceived.connect(self.processBinaryMessage)
self.clientConnection.disconnected.connect(self.socketDisconnected)

print("newClient")
self.clients.append(self.clientConnection)

def processTextFrame(self, frame, is_last_frame):
print("in processTextFrame")
print("\tFrame: {} ; is_last_frame: {}".format(frame, is_last_frame))

def processTextMessage(self, message):
print("processTextMessage - message: {}".format(message))
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)

def processBinaryMessage(self, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)

def socketDisconnected(self):
print("socketDisconnected")
if self.clientConnection:
self.clients.remove(self.clientConnection)
self.clientConnection.deleteLater()

if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.NonSecureMode)
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
app.exec_()

客户端使用一些 QTimer 来调用 __init__ 方法之外的所需方法。我还添加了 ping/pong 方法来检查连接:

import sys

from PyQt5 import QtCore, QtWebSockets, QtNetwork
from PyQt5.QtCore import QUrl, QCoreApplication, QTimer
from PyQt5.QtWidgets import QApplication


class Client(QtCore.QObject):
def __init__(self, parent):
super().__init__(parent)

self.client = QtWebSockets.QWebSocket("",QtWebSockets.QWebSocketProtocol.Version13,None)
self.client.error.connect(self.error)

self.client.open(QUrl("ws://127.0.0.1:1302"))
self.client.pong.connect(self.onPong)

def do_ping(self):
print("client: do_ping")
self.client.ping(b"foo")

def send_message(self):
print("client: send_message")
self.client.sendTextMessage("asd")

def onPong(self, elapsedTime, payload):
print("onPong - time: {} ; payload: {}".format(elapsedTime, payload))

def error(self, error_code):
print("error code: {}".format(error_code))
print(self.client.errorString())

def close(self):
self.client.close()

def quit_app():
print("timer timeout - exiting")
QCoreApplication.quit()

def ping():
client.do_ping()

def send_message():
client.send_message()

if __name__ == '__main__':
global client
app = QApplication(sys.argv)

QTimer.singleShot(2000, ping)
QTimer.singleShot(3000, send_message)
QTimer.singleShot(5000, quit_app)

client = Client(app)

app.exec_()

服务器输出:

G:\Qt\QtTests>python so_qwebsocket_server.py
server name: My Socket
Listening: My Socket:127.0.0.1:1302
True
onNewConnection
newClient
in processTextFrame
Frame: asd ; is_last_frame: True
processTextMessage - message: asd
socketDisconnected

客户端输出:

G:\Qt\QtTests>python so_qwebsocket_client.py
client: do_ping
onPong - time: 0 ; payload: b'foo'
client: send_message
timer timeout

总而言之,如果您在一个简单的 GUI 中使用您的客户端(例如,将 client.sendTextMessage() 移到 __init__ 之外并连接一个按钮,单击以实际发送消息),由于其异步性质,它应该可以正常工作!

关于python - 如何在 PyQt5 中使用 QWebSocket 创建 websocket 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35237245/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com