gpt4 book ai didi

python - 带有 Gui 的 Websocket

转载 作者:行者123 更新时间:2023-12-03 23:06:26 35 4
gpt4 key购买 nike

我的第一个类允许我连接到 websocket,第二堂课允许我尝试用图表来显示数据。

在代码中,我只是尝试在 weboscket 使用 pyqtgraph 运行时显示图形,但该窗口完全有问题。

谢谢

import asyncio
import json
import websockets
from asyncqt import asyncSlot, QtCore
import pyqtgraph as pg

class Ws(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(dict)

def __init__(self, parent=None):
super().__init__(parent)
self._websocket = None

@property
def websocket(self):
return self._websocket

async def connect(self, server,):
self._websocket = await websockets.connect(server)
await self.on_message()

@asyncSlot(dict)
async def on_message(self):
while True:
message = await self.websocket.recv()
message = json.loads(message)
self.dataChanged.emit(message)


@asyncSlot(dict)
async def send_message(self, message):
while self.websocket is None:
await asyncio.sleep(0.2)
data = json.dumps(message)
await self.websocket.send(data)

def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
loop.run_forever()

class TestUi(object):
def __init__(self):

self.plt = pg.plot()

self.ws = Ws()
self.ws.send_message(
{"op": "subscribe", "args": ["instrument:XBTUSD"]})
self.ws.dataChanged.connect(self.update_data)
self.ws.run()

def update_data(self, data):
print(data)


if __name__ == "__main__":
u = TestUi()

最佳答案

您必须设置 asyncqt 的 eventloop:

import asyncio
import json
import sys
import websockets


from PyQt5 import QtCore, QtWidgets

import pyqtgraph as pg

from asyncqt import QEventLoop, asyncSlot


class Ws(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(dict)

def __init__(self, parent=None):
super().__init__(parent)
self._websocket = None

@property
def websocket(self):
return self._websocket

async def connect(
self, server,
):
self._websocket = await websockets.connect(server)
await self.on_message()

@asyncSlot(dict)
async def on_message(self):
while True:
message = await self.websocket.recv()
message = json.loads(message)
self.dataChanged.emit(message)

@asyncSlot(dict)
async def send_message(self, message):
while self.websocket is None:
await asyncio.sleep(0.2)
data = json.dumps(message)
await self.websocket.send(data)

def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
loop.run_forever()


class TestUi(object):
def __init__(self):

app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)

self.plt = pg.plot()

self.ws = Ws()
self.ws.send_message({"op": "subscribe", "args": ["instrument:XBTUSD"]})
self.ws.dataChanged.connect(self.update_data)
self.ws.run()

def update_data(self, data):
print(data)


if __name__ == "__main__":
u = TestUi()

关于python - 带有 Gui 的 Websocket,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62431935/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com