gpt4 book ai didi

session - 具有 session 处理功能的 Python 2 SSL xmlrpc 服务器

转载 作者:太空宇宙 更新时间:2023-11-03 13:28:09 25 4
gpt4 key购买 nike

我搜索了很长时间才找到标题中提到的实现,但找不到任何东西。所以我自己实现了它。我将代码发布在这里,这样其他人可能会觉得它有用。

如果有人发现错误,我愿意改正。这是代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os

from hashlib import sha256
import hmac
import uuid
import time
from datetime import datetime

import ssl
import socket
import SocketServer
import BaseHTTPServer
import SimpleHTTPServer
import SimpleXMLRPCServer
from xmlrpclib import Fault


# Configure below
LISTEN_HOST = '127.0.0.1' # You should not use '' here, unless you have a real FQDN.
LISTEN_PORT = 2048

KEYFILE = os.path.join('certs', 'server.key') # Replace with your PEM formatted key file
CERTFILE = os.path.join('certs', 'server.crt') # Replace with your PEM formatted certificate file

# 2011/01/01 in UTC
EPOCH = 1293840000

def require_login(decorated_function):
"""
Decorator that prevents access to action if not logged in.

If the login check failed a xmlrpclib.Fault exception is raised
"""

def wrapper(self, session_id, *args, **kwargs):
""" Decorated methods must always have self and session_id """

# check if a valid session is available
if not self.sessions.has_key(session_id):
self._clear_expired_sessions() # clean the session dict
raise Fault("Session ID invalid", "Call login(user, pass) to aquire a valid session")

last_visit = self.sessions[session_id]["last_visit"]

# check if timestamp is valid
if is_timestamp_expired(last_visit):
self._clear_expired_sessions() # clean the session dict
raise Fault("Session ID expired", "Call login(user, pass) to aquire a valid session")

self.sessions[session_id]["last_visit"] = get_timestamp()
return decorated_function(self, session_id, *args, **kwargs)

return wrapper

def timestamp_to_datetime(timestamp):
"""
Convert a timestamp from 'get_timestamp' into a datetime object

Args:
ts: An integer timestamp

Returns:
A datetime object
"""

return datetime.utcfromtimestamp(timestamp + EPOCH)

def get_timestamp():
"""
Returns the seconds since 1/1/2011.

Returns:
A integer timestamp
"""

return int(time.time() - EPOCH)

def is_timestamp_expired(timestamp, max_age = 2700): # maxage in seconds (here: 2700 = 45 min)
"""
Checks if the given timestamp is expired

Args:
timestamp: An integer timestamp
max_age : The maximal allowd age of the timestamp in seconds

Returns:
True if the timestamp is expired or False if the timestamp is valid
"""

age = get_timestamp() - timestamp
if age > max_age:
return True
return False


class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
def __init__(self, server_address, HandlerClass, logRequests=True, allow_none=False):
"""
Secure XML-RPC server.
It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
"""
self.logRequests = logRequests
self.allow_none = True

SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, self.allow_none, None)
SocketServer.BaseServer.__init__(self, server_address, HandlerClass)

self.socket = ssl.wrap_socket(socket.socket(), server_side=True, certfile=CERTFILE,
keyfile=KEYFILE, ssl_version=ssl.PROTOCOL_SSLv23)

self.server_bind()
self.server_activate()

class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
"""
Secure XML-RPC request handler class.
It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
"""

def setup(self):
self.connection = self.request
self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)

def do_POST(self):
"""Handles the HTTPS POST request.

It was copied out from SimpleXMLRPCServer.py and modified to shutdown the socket cleanly.
"""

try:
# get arguments
data = self.rfile.read(int(self.headers["content-length"]))
# In previous versions of SimpleXMLRPCServer, _dispatch
# could be overridden in this class, instead of in
# SimpleXMLRPCDispatcher. To maintain backwards compatibility,
# check to see if a subclass implements _dispatch and dispatch
# using that method if present.
response = self.server._marshaled_dispatch(
data, getattr(self, '_dispatch', None)
)
except Exception as e: # This should only happen if the module is buggy
# internal error, report as HTTP server error
self.send_response(500)
self.end_headers()
else:
# got a valid XML RPC response
self.send_response(200)
self.send_header("Content-type", "text/xml")
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)

# shut down the connection
self.wfile.flush()

#modified as of http://docs.python.org/library/ssl.html
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()

class XMLRPCHandler:
"""
Example implementation for login handling
"""

def __init__(self):
self.users = {"test": "test", "foo": "bar"} # replace with your own authentication
self.sessions = dict()
self.session_key = os.urandom(32)

def _find_session_by_username(self, username):
"""
Try to find a valid session by username.

Args:
username: The username to search for

Returns:
If a session is found it is returned otherwise None is returned
"""

for session in self.sessions.itervalues():
if session["username"] == username:
return session

def _invalidate_session_id(self, session_id):
"""
Remove a session.

Args:
session_id: The session which should be removed
"""

try:
del self.sessions[session_id]
except KeyError:
pass

def _clear_expired_sessions(self):
"""
Clear all expired sessions
"""

for session_id in self.sessions.keys():
last_visit = self.sessions[session_id]["last_visit"]
if is_timestamp_expired(last_visit):
self._invalidate_session_id(session_id)

def _generate_session_id(self, username):
"""
Generates a new session id

Returns:
A new unique session_id
"""

return hmac.new(self.session_key, username + str(uuid.uuid4()), sha256).hexdigest()

def login(self, username, password):
"""
Handle the login procedure. If the login is successfull the session id is returned
otherwise a xmlrpclib.Fault exception is raised.

Args:
username: The username
password: The password

Returns:
A valid session id

Raises:
A xmlrpclib.Fault exception is raised
"""
# check username and password
if self.users.has_key(username):
if self.users[username] == password:

# Check if a session with the username exists
session = self._find_session_by_username(username)
if session:
if is_timestamp_expired(session["last_visit"]):
self._invalidate_session_id(session["session_id"])
else:
return session["session_id"]

# generate session id and save it
session_id = self._generate_session_id(username)
self.sessions[session_id] = {"username" : username,
"session_id": session_id,
"last_visit": get_timestamp()}

return session_id

raise Fault("unknown username or password", "Please check your username and password")

@require_login
def hello(self, session_id, name):
"""
Example method which requires a login
"""
if not name:
raise Fault("unknown recipient", "I need someone to greet!")
return "Hello, %s!" % name

def test():
server_address = (LISTEN_HOST, LISTEN_PORT)
server = SecureXMLRPCServer(server_address, SecureXMLRpcRequestHandler)
server.register_introspection_functions()
server.register_instance(XMLRPCHandler())

sa = server.socket.getsockname()
print "Serving HTTPS on", sa[0], "port", sa[1]
server.serve_forever()

if __name__ == "__main__":
test()

""" Testcode for a example client """
import time
def continue_xmlrpc_call(func, *args):
try:
ret = func(*args)
print ret
return ret
except xmlrpclib.Fault as e:
print e

server = xmlrpclib.ServerProxy("https://localhost:2048")

print server
print server.system.listMethods()
sid = continue_xmlrpc_call(server.login, "foo", "bar")
sid = continue_xmlrpc_call(server.login, "foo", "bar")
continue_xmlrpc_call(server.hello, sid, "World")
time.sleep(2)
continue_xmlrpc_call(server.hello, sid, "Invalid")
continue_xmlrpc_call(server.hello, "193", "")

最佳答案

@菲利普不错的设置。但是我发现登录功能有点奇怪。似乎任何知道登录用户名的人都可以获取 session ID,然后执行操作。如果您移动 session 代码以便仅在输入正确的用户名和密码时才返回现有 session ID,我看起来会更好。

    # check username and password
if self.users.has_key(username):
if self.users[username] == password:
# Check if a session with the username exists
session = self._find_session_by_username(username)
if session:
if is_timestamp_expired(session["last_visit"]):
self._invalidate_session_id(session["session_id"])
else:
return session["session_id"]

# generate session id and save it
session_id = self._generate_session_id(username)
self.sessions[session_id] = {"username" : username,
"session_id": session_id,
"last_visit": get_timestamp()}

return session_id

关于session - 具有 session 处理功能的 Python 2 SSL xmlrpc 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17828637/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com