gpt4 book ai didi

python - 如何允许同时进行更多套接字连接?

转载 作者:太空宇宙 更新时间:2023-11-03 12:07:57 25 4
gpt4 key购买 nike

我正在尝试实现 Reactor pattern在 Python 中。我想我已经开始使用 multiprocessingselect.select 了。但是,我正在尝试对我的服务器进行压力测试,因此我编写了一个简单的 DoS 客户端来用连接淹没它。但是我得到一个有趣的错误:

[WinError 10061] No connection could be made because the target machine actively refused it

有趣的是,我正在为 backlog amount 执行 socket.listen(5)在服务器上。在我通过 select.select 让读者准备好后,我显示计数,但我只有 1 或 2 - 而不是我期望的 5。

对于少量线程(~20),我没有注意到它阻塞,但对于大量线程(50+),它确实倾向于拒绝连接。

我的问题是在服务器端还是在客户端(或者只是在操作系统/套接字级别)?这是我可以解决的问题吗?如果是这样,怎么做到的?

这是我的代码:

客户端

import threading
import time
import socket
from contextlib import contextmanager

IP = '127.0.0.1'
PORT = 4200

@contextmanager
def open_socket(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((ip, port))
yield sock
finally:
sock.close()


class Flood(threading.Thread):
def __init__(self, id):
super(Flood, self).__init__()
self.id = id
self.failed = False


def run(self):
try:
with open_socket(IP, PORT) as sock:
msg = "Hello this is some data from %d" % self.id
sock.send(msg.encode())
except Exception as e:
print(e)
self.failed = True


def make_threads(count):
return [Flood(_) for _ in range(count)]


threads = make_threads(5000)

start = time.time()
for t in threads:
t.start()

for t in threads:
t.join()

print("Failed: ", sum(1 if x.failed else 0 for x in threads))
print("Done in %f seconds" % (time.time() - start))

服务器

import sys
import logging
import socket
import select
import time
import queue
from multiprocessing import Process, Queue, Value
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())

IP = '127.0.0.1'
PORT = 4200

keep_running = True

def dispatcher(q, keeprunning):
try:
while keeprunning:
val = None
try:
val = q.get(True, 5)
if val:
log.debug(val[0].recv(1024).decode())
val[0].shutdown(socket.SHUT_RDWR)
val[0].close()
except queue.Empty:
pass
log.debug("Dispatcher quitting")
except KeyboardInterrupt:
log.debug("^C caught, dispatcher quitting")


def mainloop(sock):
readers, writers, errors = [sock], [], []
timeout = 5
while True:
readers, writers, errors = select.select(readers,
writers,
errors,
timeout)
incoming = yield readers, writers, errors
if incoming and len(incoming) == 3:
readers, writers, errors = incoming
if not readers:
readers.append(sock)


def run_server():
keeprunning = Value('b', True)
q = Queue()
p = Process(target=dispatcher, args=(q, keep_running))
try:
p.start()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((IP, PORT))
sock.listen(50)
sock.setblocking(0)
loop = mainloop(sock)
for readers, writers, errors in loop:
if readers:
client, addr = readers[0].accept()
q.put((client, addr))
log.debug('*'*50)
log.debug('%d Readers', len(readers))
log.debug('%d Writers', len(writers))
log.debug('%d Errors', len(errors))


except KeyboardInterrupt:
log.info("^C caught, shutting down...")
finally:
keeprunning.value = False
sock.close()
p.join()

if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: test.py (client|server)")
elif sys.argv[1] == 'client':
run_client()
elif sys.argv[1] == 'server':
run_server()

最佳答案

我尝试测试您的代码,但在 import queue 上失败。

不过,可能是这样

  • 您的操作系统作为指定的 listen() 函数运行:“实现可能会对积压施加限制并默默地减少指定值。”
  • 一旦有足够多的未完成连接,您的操作系统就会停止接受连接,这可能不会根据请求显示。

这些只是对可能原因的猜测;也许我完全错了。

关于python - 如何允许同时进行更多套接字连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21194732/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com