gpt4 book ai didi

python - 如何实现双向 jsonrpc + 扭曲的服务器/客户端

转载 作者:行者123 更新时间:2023-12-01 06:13:52 25 4
gpt4 key购买 nike

您好,我正在开发一个基于twisted的rpc服务器,以服务于多个微 Controller ,这些微 Controller 对twisted jsonrpc服务器进行rpc调用。但该应用程序还要求服务器随时向每个微发送信息,因此问题是如何成为一个好的实践,以防止来自微的远程 jsonrpc 调用的响应与针对其发出的服务器 jsonrpc 请求相混淆。一个用户。

我现在得到的结果是,微 Controller 收到了错误的信息,因为它们不知道从套接字传来的 netstring/json 字符串是来自先前要求的响应还是来自服务器的新请求。

这是我的代码:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
def connectionMade(self):
pass

def jsonrpc_identify(self,username,password,mac):
""" Each client must be authenticated just after to be connected calling this rpc """
if creds.has_key(username):
if creds[username] == password:
authenticated = True
else:
authenticated = False
else:
authenticated = False

if authenticated:
self.factory.clients.append(self)
self.factory.references[mac] = weakref.ref(self)
return {'results':'Authenticated as %s'%username,'error':None}
else:
self.transport.loseConnection()

def jsonrpc_sync_acq(self,data,f):
"""Save into django table data acquired from sensors and send ack to gateway"""
if not (self in self.factory.clients):
self.transport.loseConnection()
print f
return {'results':'synced %s records'%len(data),'error':'null'}

def connectionLost(self, reason):
""" mac address is searched and all reference to self.factory.clientes are erased """
for mac in self.factory.references.keys():
if self.factory.references[mac]() == self:
print 'Connection closed - Mac address: %s'%mac
del self.factory.references[mac]
self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
protocol = arduinoRPC
def __init__(self, maxLength=1024):
self.maxLength = maxLength
self.subHandlers = {}
self.clients = []
self.references = {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
def __init__(self,rpcfactory):
threading.Thread.__init__(self)
self.rpcfactory = rpcfactory
"""identifiers of each micro/client connected"""
self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
def run(self):
while True:
time.sleep(10)
while True:
""" call to any of three potential micros connected """
mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
if self.rpcfactory.references.has_key(mac):
print 'Calling %s'%mac
proto = self.rpcfactory.references[mac]()
""" requesting echo from selected micro"""
dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
proto.transport.write(dataToSend)
break

factory = rpcfactory(arduinoRPC)

"""start thread caller"""
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()

最佳答案

您需要向每条消息添加足够的信息,以便收件人可以确定如何解释它。您的要求听起来与 AMP 的要求非常相似,因此您可以改用 AMP 或使用与 AMP 相同的结构来识别您的消息。具体来说:

  • 在请求中放置特定的键 - 例如,AMP 使用“_ask”来识别请求。它还为这些提供了一个唯一的值,该值进一步标识了连接生命周期内的请求。
  • 在响应中,输入不同的键 - 例如,AMP 为此使用“_answer”。该值与响应所针对的请求中“_ask”键的值相匹配。

使用这样的方法,您只需查看是否有“_ask”键或“_answer”键即可确定您是否收到了新请求或对先前请求的响应。

在一个单独的主题中,您的 asyncGatewayCalls 类不应基于线程。它没有明显的理由使用线程,这样做也是滥用 Twisted API,从而导致未定义的行为。大多数 Twisted API 只能在您调用 reactor.run 的线程中使用。唯一的异常(exception)是reactor.callFromThread,您可以使用它从任何其他线程向 react 器线程发送消息。不过,asyncGatewayCalls 会尝试写入传输,这将导致缓冲区损坏或发送数据的任意延迟,或者可能会导致更糟糕的情况。相反,您可以像这样编写 asyncGatewayCalls:

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
def __init__(self, rpcfactory):
self.rpcfactory = rpcfactory
self.remoteMacList = [...]

def run():
self._call = LoopingCall(self._pokeMicro)
return self._call.start(10)

def _pokeMicro(self):
while True:
mac = self.remoteMacList[...]
if mac in self.rpcfactory.references:
proto = ...
dataToSend = ...
proto.transport.write(dataToSend)
break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

这为您提供了一个单线程解决方案,该解决方案应具有与原始 asyncGatewayCalls 类相同的行为。不过,它不是为了调度调用而在线程中的循环中休眠,而是使用 react 器的调度 API(通过更高级别的 LoopingCall 类,该类调度要重复调用的内容)来确保 _pokeMicro 每十秒调用一次。

关于python - 如何实现双向 jsonrpc + 扭曲的服务器/客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4387477/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com