gpt4 book ai didi

python - 如何创建一个带有两个线程的 python 应用程序,每个线程都有一个高速公路应用程序

转载 作者:太空宇宙 更新时间:2023-11-03 14:20:40 24 4
gpt4 key购买 nike

我还没有找到解决我的问题的方法。我需要创建一个具有两个线程的 python 应用程序,每个线程都使用高速公路库连接到 WAMP 路由器。

按照我写的实验代码:

wampAddress = 'ws://172.17.3.139:8181/ws'
wampRealm = 's4t'

from threading import Thread
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks


class AutobahnMRS(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Sessio attached [Connect to WAMP Router]")

def onMessage(*args):
print args
try:
yield self.subscribe(onMessage, 'test')
print ("Subscribed to topic: test")

except Exception as e:
print("Exception:" +e)

class AutobahnIM(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Sessio attached [Connect to WAMP Router]")

try:
yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO')
print ("Subscribed to topic: test")

except Exception as e:
print("Exception:" +e)

class ManageRemoteSystem:
def __init__(self):
self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

def start(self):
self.runner.run(AutobahnMRS);


class InternalMessages:
def __init__(self):
self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

def start(self):
self.runner.run(AutobahnIM);

#class S4tServer:

if __name__ == '__main__':
server = ManageRemoteSystem()
sendMessage = InternalMessages()

thread1 = Thread(target = server.start())
thread1.start()
thread1.join()

thread2 = Thread(target = sendMessage.start())
thread2.start()
thread2.join()

当我启动这个 python 应用程序时,只有 thread1 被启动,稍后当我终止该应用程序 (ctrl-c) 时,会显示以下错误消息:

Sessio attached [Connect to WAMP Router]
Subscribed to topic: test
^CTraceback (most recent call last):
File "test_pub.py", line 71, in <module>
p2 = multiprocessing.Process(target = server.start())
File "test_pub.py", line 50, in start
self.runner.run(AutobahnMRS);
File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run
reactor.run()
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run
self.startRunning(installSignalHandlers=installSignalHandlers)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning
ReactorBase.startRunning(self)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning
raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable

我需要在一个具有其功能的应用程序中实现,而且它必须有一个系统来与带有高速公路 python 库的 WAMP 路由器通信。

换句话说,我需要一个能够与 WAMP 路由器通信的解决方案,但同时这个应用程序不必被高速公路部分阻塞(我认为解决方案是启动两个线程,一个线程管理一些功能,第二个线程管理高速公路部分)。

使用我之前提出的架构,还有另一个问题,需要在 WAMP 路由器上的特定主题中从“无高速公路线程”中的应用程序部分发送消息,此功能应通过调用不阻塞其他功能的特定功能。

我希望我已经提供了所有细节。

非常感谢任何回复

--------------------------------编辑---------------- ------------------

经过一些研究,我实现了我需要的 websocket 协议(protocol),代码如下:

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
def __init__(self, factory):
self.factory = factory

def onOpen(self):
#log.debug("Client connected")
self.factory.protocol_instance = self
self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
def __init__(self, *args, **kwargs):
WebSocketClientFactory.__init__(self, *args, **kwargs)
self.protocol_instance = None
self.base_client = None

def buildProtocol(self, addr):
return _WebSocketClientProtocol(self)
# ------ end twisted -------
lass BaseWBClient(object):

def __init__(self, websocket_settings):
#self.settings = websocket_settings
# instance to be set by the own factory
self.factory = None
# this event will be triggered on onOpen()
self._connected_event = threading.Event()
# queue to hold not yet dispatched messages
self._send_queue = Queue.Queue()
self._reactor_thread = None

def connect(self):

log.msg("Connecting to host:port")
self.factory = _WebSocketClientFactory(
"ws://host:port",
debug=True)
self.factory.base_client = self

c = connectWS(self.factory)

self._reactor_thread = threading.Thread(target=reactor.run,
args=(False,))
self._reactor_thread.daemon = True
self._reactor_thread.start()

def send_message(self, body):
if not self._check_connection():
return
log.msg("Queing send")
self._send_queue.put(body)
reactor.callFromThread(self._dispatch)

def _check_connection(self):
if not self._connected_event.wait(timeout=10):
log.err("Unable to connect to server")
self.close()
return False
return True

def _dispatch(self):
log.msg("Dispatching")
while True:
try:
body = self._send_queue.get(block=False)
except Queue.Empty:
break
self.factory.protocol_instance.sendMessage(body)

def close(self):
reactor.callFromThread(reactor.stop)

import time
def Ppippo(coda):
while True:
coda.send_message('YOOOOOOOO')
time.sleep(5)

if __name__ == '__main__':

ws_setting = {'host':'', 'port':}

client = BaseWBClient(ws_setting)

t1 = threading.Thread(client.connect())
t11 = threading.Thread(Ppippo(client))
t11.start()
t1.start()

之前的代码工作正常,但我需要将其转换为在 WAMP 协议(protocol) insted websocket 上运行。

有谁知道我是怎么解决的吗?

最佳答案

坏消息是 Autobahn 正在使用 Twisted 主循环,因此您不能同时在两个线程中运行它。

好消息是您不需要在两个线程中运行它来运行两个东西,无论如何两个线程会更复杂。

启动多个应用程序的 API 有点困惑,因为你有两个 ApplicationRunner 对象,乍一看,你在高速公路上运行应用程序的方式是调用 ApplicationRunner.run.

但是,ApplicationRunner 只是一种便利,它包装了设置应用程序的内容和运行主循环的内容;真正的工作发生在 WampWebSocketClientFactory 中。

为了实现你想要的,你只需要摆脱线程,自己运行主循环,让 ApplicationRunner 实例简单地设置它们的应用程序。

为了实现这一点,您需要更改程序的最后一部分来执行此操作:

class ManageRemoteSystem:
def __init__(self):
self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

def start(self):
# Pass start_reactor=False to all runner.run() calls
self.runner.run(AutobahnMRS, start_reactor=False)


class InternalMessages:
def __init__(self):
self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

def start(self):
# Same as above
self.runner.run(AutobahnIM, start_reactor=False)


if __name__ == '__main__':
server = ManageRemoteSystem()
sendMessage = InternalMessages()
server.start()
sendMessage.start()

from twisted.internet import reactor
reactor.run()

关于python - 如何创建一个带有两个线程的 python 应用程序,每个线程都有一个高速公路应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28414302/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com