gpt4 book ai didi

python - 如何创建只有服务器对客户端进行身份验证的 Python SSL 客户端/服务器对

转载 作者:行者123 更新时间:2023-12-04 22:36:51 26 4
gpt4 key购买 nike

我一直在尝试让一个简单的 Python SSL 示例运行一天,但没有成功。我想创建一个 SSL 服务器和 SSL 客户端。服务器应该对客户端进行身份验证。 Python 文档对 SSL 模块的示例非常简单,通常我找不到很多工作示例。我正在使用的代码如下;

客户:

import socket
import ssl


class SSLClient:
def __init__(self, server_host, server_port, client_cert, client_key):
self.server_host = server_host
self.server_port = server_port
self._context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self._context.load_cert_chain(client_cert, client_key)
self._sock = None
self._ssock = None

def __del__(self):
self.close()

def connect(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._ssock = self._context.wrap_socket(
self._sock, server_hostname=self.server_host
)
self._ssock.connect((self.server_host, self.server_port))

def send(self, msg):
self._ssock.send(msg.encode())

def close(self):
self._ssock.close()

服务器:

import socket
import ssl
from threading import Thread


class SSLServer:
def __init__(self, host, port, cafile, chunk_size=1024):
self.host = host
self.port = port
self.chunk_size = chunk_size
self._context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
self._context.load_verify_locations(cafile)
self._ssock = None

def __del__(self):
self.close()

def connect(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock:
sock.bind((self.host, self.port))
sock.listen(5)
with self._context.wrap_socket(sock, server_side=True) as self._ssock:
conn, _ = self._ssock.accept()

while True:
data = conn.recv(self.chunk_size).decode()
print(data)
if data is None:
break

def close(self):
self._ssock.close()


class SSLServerThread(Thread):
def __init__(self, server):
super().__init__()
self._server = server
self.daemon = True

def run(self):
self._server.connect()

def stop(self):
self._server.close()

测试脚本:

import client, server
from os import path
from time import sleep

server_host = "localhost"
server_port = 11234
client_cert = path.join(path.dirname(__file__), "client.crt")
client_key = path.join(path.dirname(__file__), "client.key")

s = server.SSLServer(server_host, server_port, client_cert)
s_thread = server.SSLServerThread(s)
s_thread.start()
sleep(2)
c = client.SSLClient(server_host, server_port, client_cert, client_key)
c.connect()

c.send("This is a test message!")

c.close()
s.close()

我使用以下命令生成了我的客户端证书和 key :

openssl req -newkey rsa:2048 \
-x509 \
-sha256 \
-days 3650 \
-nodes \
-out client.crt \
-keyout client.key \
-subj "/C=UK/ST=Scotland/L=Glasgow/O=Company A/OU=Testing/CN=MyName"

测试脚本似乎启动了服务器并允许客户端连接,但是当我尝试发送测试消息时收到 BrokenPipeError。

令人恼火的是,我一直收到各种不同的错误消息,所以这很可能是多种因素的结合。这是我创建的一个简单示例,旨在尝试让某些东西正常工作。在我更复杂的示例中,当客户端尝试连接到服务器时,我得到“NO_SHARED_CIPHERS”。令人恼火的是,我不明白为什么这个简单的示例似乎比更复杂的示例走得更远(即连接似乎已成功建立),即使它们的设置几乎相同。

如果有人想测试它,我已经在 git@github.com:stevengillies87/python-ssl-client-auth-example.git 上传了一个 repo。


我意识到第一个错误来自复制粘贴和示例,但没有意识到它在设置方面与我的代码有何不同。它使用 socket.socket() 创建套接字,而我的示例使用 socket.create_connection(),它也连接套接字。这就是我收到 BrokenPipeError 的原因。现在,我的简单示例和我正在编写的实际代码都出现了一致的 NO_SHARED_CIPHER 错误。我在源代码中添加了一行以在套接字被包装后连接客户端。

最佳答案

所以,正如预期的那样,这是多种因素的结合。

在我将 SSL 层添加到我的代码之前,它使用 TCP 套接字。我在客户端中使用 socket.create_connection() 在一次调用中创建和连接一个套接字。当我添加 SSL 时,我继续这样做,但是因为我试图通过 TCP 套接字连接到 SSL 服务器,所以我收到了 NO_SHARED_CIPHER 错误。

这个问题的解决方案是只用 sock = socket.socket() 创建 TCP 套接字,用 ssock = ssl_context.wrap_context(sock) 包装它,然后然后调用 SSL 层上的连接,ssock.connect((host, port))

但是,我在连接时仍然遇到握手错误。我找到了这个链接,https://www.electricmonk.nl/log/2018/06/02/ssl-tls-client-certificate-verification-with-python-v3-4-sslcontext/ ,其中提供了如何创建相互验证的 SSL 客户端/服务器的详细示例。至关重要的是,作者指出用于服务器身份验证的主机名必须与创建 server.crt 和 server.key 文件时输入的“通用名称”相匹配。以前我一直在使用与我连接的同一主机,在本例中为“localhost”。他们还指出,对于客户端身份验证,ssl_context 验证模式应设置为 verify_mode = ssl.CERT_REQUIRED

示例运行后,我开始着手删除服务器的客户端身份验证。这是通过将客户端 SSL 上下文从 ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 更改为 ssl.SSLContext() 来完成的。客户端现在不需要 server.crt 文件即可成功连接。

令人沮丧的是,我仍然需要创建服务器证书/ key 文件并使用 ssl_context.load_cert_chain() 将它们加载到服务器中,即使我不需要对服务器进行身份验证。如果我尝试从服务器中删除此步骤,我会再次收到 NO_SHARED_CIPHER 错误。如果有人知道如何避免这种情况,请告诉我,或解释为什么有必要这样做。

下面的工作代码,并在问题中的 github 链接上更新。

客户:

import socket
import ssl

class SSLClient:
def __init__(
self, server_host, server_port, sni_hostname, client_cert, client_key,
):
self.server_host = server_host
self.server_port = server_port
self.sni_hostname = sni_hostname
self._context = ssl.SSLContext()
self._context.load_cert_chain(client_cert, client_key)
self._sock = None
self._ssock = None

def __del__(self):
self.close()

def connect(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._ssock = self._context.wrap_socket(self._sock,)
self._ssock.connect((self.server_host, self.server_port))

def send(self, msg):
self._ssock.send(msg.encode())

def close(self):
self._ssock.close()

服务器:

import socket
import ssl
from threading import Thread


class SSLServer:
def __init__(
self, host, port, server_cert, server_key, client_cert, chunk_size=1024
):
self.host = host
self.port = port
self.chunk_size = chunk_size
self._context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self._context.verify_mode = ssl.CERT_REQUIRED
self._context.load_cert_chain(server_cert, server_key)
self._context.load_verify_locations(client_cert)

def connect(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock:
sock.bind((self.host, self.port))
sock.listen(5)
while True:
conn, _ = sock.accept()
with self._context.wrap_socket(conn, server_side=True) as sconn:
self._recv(sconn)

def _recv(self, sock):
while True:
data = sock.recv(self.chunk_size)
if data:
print(data.decode())
else:
break


class SSLServerThread(Thread):
def __init__(self, server):
super().__init__()
self._server = server
self.daemon = True

def run(self):
self._server.connect()

测试:

import client, server
from os import path
from time import sleep

server_host = "127.0.0.1"
server_port = 35689
server_sni_hostname = "www.company-b.com"
client_cert = path.join(path.dirname(__file__), "client.crt")
client_key = path.join(path.dirname(__file__), "client.key")
server_cert = path.join(path.dirname(__file__), "server.crt")
server_key = path.join(path.dirname(__file__), "server.key")

s = server.SSLServer(server_host, server_port, server_cert, server_key, client_cert)
s_thread = server.SSLServerThread(s)
s_thread.start()
sleep(2)
c = client.SSLClient(
server_host, server_port, server_sni_hostname, client_cert, client_key
)
c.connect()

c.send("This is a test message!")

c.close()

关于python - 如何创建只有服务器对客户端进行身份验证的 Python SSL 客户端/服务器对,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63980192/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com