gpt4 book ai didi

python - 具有中央中继服务器的多线程套接字

转载 作者:太空宇宙 更新时间:2023-11-04 02:09:36 26 4
gpt4 key购买 nike

我以前设法实现了一个客户端服务器套接字脚本,该脚本在单个客户端和服务器之间中继消息,现在我正在尝试实现一个多客户端系统。

更具体地说,我想将服务器用作两个客户端之间的某种媒介,它从一个客户端检索信息并将其中继到另一个客户端。我曾尝试附加并发送接收客户端的端口号,然后从服务器端的消息中提取它。之后,我会尝试将其发送到具有该端口号的任何套接字,但遇到了一些麻烦(因为端口号是在我确定的发送点确定的?),所以现在我只是在尝试将发送的消息中继回去给所有客户。但是,问题在于该消息仅被发送到服务器,而没有被中继到所需的客户端。

我以前曾尝试实现对等系统,但遇到麻烦,因此我决定退后一步,改为这样做。

Server.py:

import socket, _thread, threading
import tkinter as tk

SERVERPORT = 8600
HOST = 'localhost'

class Server():
def __init__(self):
self.Connected = True
self.ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
self.ServerSocket.bind((HOST, SERVERPORT))
self.ServerSocket.listen(2)
self.Clients = []

def Listen(self):
print('Server is now running')
while self.Connected:
ClientSocket, Address = self.ServerSocket.accept()
self.Clients.append(Address)
print('\nNew user connected', Address)
t = threading.Thread(target=self.NewClient, args=(ClientSocket,
Address))
t.daemon = True
t.start()
self.Socket.close()

def NewClient(self, ClientSocket, Address):
while self.Connected:
if ClientSocket:
try:
ReceivedMsg = ClientSocket.recv(4096)
print('Message received from', Address, ':', ReceivedMsg)
self.Acknowledge(ClientSocket, Address)
if ReceivedMsg.decode('utf8').split()[-1] != 'message':
ReceiverPort = self.GetSendPort(ReceivedMsg)
self.SendToClient(ClientSocket,ReceivedMsg,ReceiverPort)
except:
print('Connection closed')
raise Exception
ClientSocket.close()

def Acknowledge(self, Socket, Address):
Socket.sendto(b'The server received your message', Address)

def GetSendPort(self, Msg):
MsgDigest = Msg.decode('utf8').split()
return int(MsgDigest[-1])

def SendToClient(self, Socket, Msg, Port):
Addr = (HOST, Msg)
for Client in self.Clients:
Socket.sendto(Msg, Client)

def NewThread(Func, *args):
if len(args) == 1:
t = threading.Thread(target=Func, args=(args,))
elif len(args) > 1:
t = threading.Thread(target=Func, args=args)
else:
t = threading.Thread(target=Func)
t.daemon = True
t.start()
t.join()

Host = Server()
NewThread(Host.Listen)

和客户端(.py):
import socket, threading
import tkinter as tk

Username = 'Ernest'
PORT = 8601
OtherPORT = 8602
SERVERPORT = 8600
HOST = '127.0.0.1'

class Client():
def __init__(self, Username):
self.Connected, self.Username = False, Username
self.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def Connect(self):
print('Trying to connect')
try:
self.Socket.connect((HOST, SERVERPORT))
self.Connected = True
print(self.Username, 'connected to server')
Msg = MsgUI(self.Username)
Msg.Display()
except Exception:
print('Could not connect to server')
raise Exception

def SendMsg(self):
if self.Connected:
Msg = '{} sent you a message {}'.format(self.Username, OtherPORT)
self.Socket.sendall(bytes(Msg, encoding='utf8'))
self.GetResponse()

def GetResponse(self, *args):
AckMsg = '\n{} received the message'.format(self.Username)
NMsg = '\n{} did not receive the message'.format(self.Username)
if self.Connected:
Msg = self.Socket.recv(4096)
print(Msg)
if Msg:
self.Socket.sendall(bytes(AckMsg, encoding='utf8'))
else:
self.Socket.sendall(bytes(NMsg, encoding='utf8'))

class MsgUI():
def __init__(self, Username):
self.Username = Username
self.entry = tk.Entry(win)
self.sendbtn = tk.Button(win, text='send', command=Peer.SendMsg)

def Display(self):
self.entry.grid()
self.sendbtn.grid()
win.mainloop()

win = tk.Tk()
Peer = Client(Username)
Peer.Connect()

我希望每当用户在tkinter窗口中按下发送按钮时就发送一条消息,但是与此同时,它一直在“监听”以查看是否收到任何消息。

我以前也尝试过在客户端的另一个线程中运行 GetResponse方法,而不是 if self.Connected,我使用 while self.Connected,但它仍然无法正常工作。

更新

经过一些有用的评论后,我对两个文件进行了如下编辑:
现在,服务器为每个首先运行的客户端保留两个套接字。服务器文件作为模块导入到客户端文件中。然后运行每个客户端文件,并且每个客户端在服务器文件中运行一个功能,请求使用套接字。如果允许该请求(即未引发任何错误),则连接套接字,将其添加到存储在服务器文件中的一组客户端中,然后返回到客户端文件中。然后,客户端使用此套接字发送和接收消息。

Server.py
import socket, _thread, threading
import tkinter as tk

SERVERPORT = 8600
HOST = 'localhost'

class Server():
def __init__(self):
self.Connected = True
self.ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
self.ServerSocket.bind((HOST, SERVERPORT))
self.ServerSocket.listen(2)
self.Clients = {}

def ConnectClient(self, Username, Port):
Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.Clients[Username] = [Socket, Port, False]
try:
self.Clients[Username][0].connect((HOST, SERVERPORT))
self.Clients[Username][2] = True
print('Opened port for user', Username)
return Socket
except Exception:
print('Could not open port for user', Username)
raise Exception

def Listen(self):
print('Server is now running')
while self.Connected:
ClientSocket, Address = self.ServerSocket.accept()
print('\nNew user connected', Address)
t = threading.Thread(target=self.NewClient, args=(ClientSocket,
Address))
t.daemon = True
t.start()
self.Socket.close()

def NewClient(self, ClientSocket, Address):
while self.Connected:
if ClientSocket:
try:
ReceivedMsg = ClientSocket.recv(4096)
if b'attempting to connect to the server' in ReceivedMsg:
ClientSocket.send(b'You are now connected to the server')
else:
print('Message received from', Address, ':',ReceivedMsg)
#self.Acknowledge(ClientSocket, Address)
ReceiverPort = self.GetSendPort(ReceivedMsg)
if ReceiverPort != None:
self.SendToClient(ClientSocket,ReceivedMsg,
ReceiverPort)
except:
print('Connection closed')
raise Exception
ClientSocket.close()

def Acknowledge(self, Socket, Address):
Socket.sendto(b'The server received your message', Address)

def GetSendPort(self, Msg):
MsgDigest = Msg.decode('utf8').split()
try:
Port = int(MsgDigest[-1])
except ValueError:
Port = None
return Port

def SendToClient(self, Socket, Msg, Port):
Addr = (HOST, Port)
Receiver = None
for Client, Vars in self.Clients.items():
if Vars[1] == Port:
Receiver = Client
self.Clients[Receiver][0].sendto(Msg, Addr)

def NewThread(Func, *args):
if len(args) == 1:
t = threading.Thread(target=Func, args=(args,))
elif len(args) > 1:
t = threading.Thread(target=Func, args=args)
else:
t = threading.Thread(target=Func)
t.daemon = True
t.start()
t.join()

Host = Server()
if __name__ == '__main__':
NewThread(Host.Listen)

和Client.py
import socket, threading, Server
import tkinter as tk

Username = 'Ernest'
PORT = 8601
OtherPORT = 8602
SERVERPORT = 8600
HOST = '127.0.0.1'

class Client():
def __init__(self, Username):
self.Connected, self.Username = False, Username

def Connect(self):
print('Requesting to connect to server')
try:
self.Socket = Server.Host.ConnectClient(self.Username, PORT)
self.Connected = Server.Host.Clients[self.Username][2]
Msg = '{} is attempting to connect to the server'.format(self.Username)
self.Socket.sendall(bytes(Msg, encoding='utf8'))
ReceivedMsg = self.Socket.recv(4096)
print(ReceivedMsg)
Msg = MsgUI(self.Username)
Msg.Display()
except Exception:
print('Could not connect to server')
raise Exception

def SendMsg(self):
try:
if self.Connected:
Msg = '{} sent you a message {}'.format(self.Username,OtherPORT)
self.Socket.sendall(bytes(Msg, encoding='utf8'))
self.GetResponse()
except Exception:
print('Connection closed')
raise Exception

def GetResponse(self, *args):
AckMsg = '\n{} received the message'.format(self.Username)
NMsg = '\n{} did not receive the message'.format(self.Username)
if self.Connected:
Msg = self.Socket.recv(4096)
print(Msg)
if Msg:
self.Socket.sendall(bytes(AckMsg, encoding='utf8'))
else:
self.Socket.sendall(bytes(NMsg, encoding='utf8'))

class MsgUI():
def __init__(self, Username):
self.Username = Username
self.entry = tk.Entry(win)
self.sendbtn = tk.Button(win, text='send', command=Peer.SendMsg)

def Display(self):
self.entry.grid()
self.sendbtn.grid()
win.mainloop()

win = tk.Tk()
Peer = Client(Username)
Peer.Connect()

现在的问题更多是python和范围问题。尝试将消息中继回客户端时,由于 KeyError字典仍然为空,因此我收到了 Clients。当在客户端文件中对服务器进行函数调用时,很明显,对字典的更新发生在客户端文件中,而不是服务器文件中-这是在另一个实例中。我需要一种更改 Clients词典内容的方法,该方法由客户端文件调用以起作用,但在服务器文件中生效。

最佳答案

您致力于多线程吗?线程不能在python中同时运行(由于GIL),虽然它们是处理并发操作的一种方法,但它们不是唯一的方法,通常不是最佳的方法,除非它们是唯一的方法。考虑一下这段代码,它不能很好地处理失败案例,但似乎可以作为一个起点。

import socket, select, Queue

svrsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
svrsock.setblocking(0)
svrsock.bind(('', 17654))
svrsock.listen(16)
client_queues = {}
write_ready=[] # we'll update this for clients only that have things in the queue
while client_queues.keys() + [svrsock] :
readable, writable, exceptional = select.select(client_queues.keys() + [svrsock] , write_ready, [])
for rd in readable:
if rd is svrsock: # reading listening socket == accepting connection
conn, addr = svrsock.accept()
print("Connection from {}".format(addr))
conn.setblocking(0)
client_queues[conn] = Queue.Queue()
else:
data = rd.recv(1024)
if data:
# TODO: send to all queues
print("Message from {}".format(rd.getpeername()))
for sock, q in client_queues.iteritems():
q.put("From {}: {}".format( rd.getpeername(), data))
if sock not in write_ready:
write_ready.append(sock)
for rw in writable:
try:
data = client_queues[rw].get_nowait()
rw.send(data)
except Queue.Empty:
write_ready.remove(rw)
continue

这个概念很简单。服务器接受连接;每个连接(套接字)都与待处理消息队列相关联。读取每个准备读取的套接字,并将其消息添加到每个客户端的队列中。如果接收方客户端尚不在其中,则将其添加到客户端的 write_ready列表中,其中有待处理的数据。然后,每个准备好写入的套接字都将其下一个排队的消息写入其中。如果没有更多消息,则将收件人从 write_ready列表中删除。

如果不使用多线程,这很容易进行编排,因为所有协调都是按应用程序顺序固有的。使用线程会更困难,代码也会更多,但由于gil可能不会带来更多性能。

无需多线程同时处理多个I/O流的秘诀就是 select。原则上讲,这很容易。我们向 select()传递了一个可能的套接字读取列表,另一个可能的套接字写入列表以及最终列表,对于这个简化的演示,我完全忽略了它们。 select调用的结果将包括一个或多个实际上已准备好进行读取或写入的套接字,这使我可以阻塞直到一个或多个套接字准备好进行 Activity 为止。然后,我每遍都处理所有准备好进行 Activity 的套接字(但是它们已经被筛选成不会阻塞的套接字)。

这里还有很多事情要做。我自己不清理,不跟踪关闭的连接,不处理任何异常等等。但是不必担心线程和并发保证,开始解决这些缺陷就很容易了。

这里是“行动中”。在这里,对于客户端,我使用netcat,它非常适合没有第4+层协议(protocol)(换句话说,原始tcp)的第3层测试。它只是打开到给定目标和端口的套接字,并通过该套接字发送其stdin并将其套接字数据发送到stdout,这使其非常适合演示此服务器应用程序!

我还想指出,服务器和客户端之间的耦合代码是不可取的,因为您将无法在不破坏另一个的情况下将更改扩展到任何一个。最好有一个“契约(Contract)”,以便在服务器和客户端之间进行对话并进行维护。即使您在同一代码库中实现服务器和客户端的行为,也应使用tcp通信协定来驱动实现,而不是代码共享。只是我的2美分,但是一旦您开始共享代码,您通常会以意想不到的方式开始耦合服务器/客户端版本。

服务器:
$ python ./svr.py
Connection from ('127.0.0.1', 52059)
Connection from ('127.0.0.1', 52061)
Message from ('127.0.0.1', 52061)
Message from ('127.0.0.1', 52059)
Message from ('127.0.0.1', 52059)

第一客户(52059):
$ nc localhost 17654
hello
From ('127.0.0.1', 52061): hello
From ('127.0.0.1', 52059): hello
From ('127.0.0.1', 52059): hello

第二位客户:
$ nc localhost 17654
From ('127.0.0.1', 52061): hello
hello
From ('127.0.0.1', 52059): hello
hello
From ('127.0.0.1', 52059): hello

如果您需要更令人信服的 select为什么比并发执行更具吸引力,请考虑以下因素:Apache基于线程模型,换句话说,每个连接都有一个工作线程。 nginx基于 select模型,因此您可以看到它可能有多快。更不用说nginx本质上更好,因为Apache受益于线程模型,因为它大量使用模块来扩展功能(例如mod_php),而nginx没有此限制,并且可以处理来自任何线程的所有请求。但是通常认为nginx的原始性能要高得多,效率也要高得多,主要原因是它避免了apache固有的几乎所有cpu上下文切换。这是一种有效的方法!

一言以蔽之。显然,这不会永远扩展。线程模型也不会;最终您用完了线程。更加分散和高吞吐量的系统可能会使用某种Pub/Sub机制,将客户端连接跟踪和消息队列从服务器转移到pub/sub数据层,并允许恢复连接并发送排队的数据,以及在负载均衡器后面添加多个服务器。只是把它扔在那里。您可能会惊讶地发现 select可以很好地扩展(cpu的速度比网络快得多,因此可能不是瓶颈)。

关于python - 具有中央中继服务器的多线程套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53937597/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com