- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我搜索了很长时间才找到标题中提到的实现,但找不到任何东西。所以我自己实现了它。我将代码发布在这里,这样其他人可能会觉得它有用。
如果有人发现错误,我愿意改正。这是代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from hashlib import sha256
import hmac
import uuid
import time
from datetime import datetime
import ssl
import socket
import SocketServer
import BaseHTTPServer
import SimpleHTTPServer
import SimpleXMLRPCServer
from xmlrpclib import Fault
# Configure below
LISTEN_HOST = '127.0.0.1' # You should not use '' here, unless you have a real FQDN.
LISTEN_PORT = 2048
KEYFILE = os.path.join('certs', 'server.key') # Replace with your PEM formatted key file
CERTFILE = os.path.join('certs', 'server.crt') # Replace with your PEM formatted certificate file
# 2011/01/01 in UTC
EPOCH = 1293840000
def require_login(decorated_function):
"""
Decorator that prevents access to action if not logged in.
If the login check failed a xmlrpclib.Fault exception is raised
"""
def wrapper(self, session_id, *args, **kwargs):
""" Decorated methods must always have self and session_id """
# check if a valid session is available
if not self.sessions.has_key(session_id):
self._clear_expired_sessions() # clean the session dict
raise Fault("Session ID invalid", "Call login(user, pass) to aquire a valid session")
last_visit = self.sessions[session_id]["last_visit"]
# check if timestamp is valid
if is_timestamp_expired(last_visit):
self._clear_expired_sessions() # clean the session dict
raise Fault("Session ID expired", "Call login(user, pass) to aquire a valid session")
self.sessions[session_id]["last_visit"] = get_timestamp()
return decorated_function(self, session_id, *args, **kwargs)
return wrapper
def timestamp_to_datetime(timestamp):
"""
Convert a timestamp from 'get_timestamp' into a datetime object
Args:
ts: An integer timestamp
Returns:
A datetime object
"""
return datetime.utcfromtimestamp(timestamp + EPOCH)
def get_timestamp():
"""
Returns the seconds since 1/1/2011.
Returns:
A integer timestamp
"""
return int(time.time() - EPOCH)
def is_timestamp_expired(timestamp, max_age = 2700): # maxage in seconds (here: 2700 = 45 min)
"""
Checks if the given timestamp is expired
Args:
timestamp: An integer timestamp
max_age : The maximal allowd age of the timestamp in seconds
Returns:
True if the timestamp is expired or False if the timestamp is valid
"""
age = get_timestamp() - timestamp
if age > max_age:
return True
return False
class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
def __init__(self, server_address, HandlerClass, logRequests=True, allow_none=False):
"""
Secure XML-RPC server.
It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
"""
self.logRequests = logRequests
self.allow_none = True
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, self.allow_none, None)
SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
self.socket = ssl.wrap_socket(socket.socket(), server_side=True, certfile=CERTFILE,
keyfile=KEYFILE, ssl_version=ssl.PROTOCOL_SSLv23)
self.server_bind()
self.server_activate()
class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
"""
Secure XML-RPC request handler class.
It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
"""
def setup(self):
self.connection = self.request
self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
def do_POST(self):
"""Handles the HTTPS POST request.
It was copied out from SimpleXMLRPCServer.py and modified to shutdown the socket cleanly.
"""
try:
# get arguments
data = self.rfile.read(int(self.headers["content-length"]))
# In previous versions of SimpleXMLRPCServer, _dispatch
# could be overridden in this class, instead of in
# SimpleXMLRPCDispatcher. To maintain backwards compatibility,
# check to see if a subclass implements _dispatch and dispatch
# using that method if present.
response = self.server._marshaled_dispatch(
data, getattr(self, '_dispatch', None)
)
except Exception as e: # This should only happen if the module is buggy
# internal error, report as HTTP server error
self.send_response(500)
self.end_headers()
else:
# got a valid XML RPC response
self.send_response(200)
self.send_header("Content-type", "text/xml")
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
# shut down the connection
self.wfile.flush()
#modified as of http://docs.python.org/library/ssl.html
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()
class XMLRPCHandler:
"""
Example implementation for login handling
"""
def __init__(self):
self.users = {"test": "test", "foo": "bar"} # replace with your own authentication
self.sessions = dict()
self.session_key = os.urandom(32)
def _find_session_by_username(self, username):
"""
Try to find a valid session by username.
Args:
username: The username to search for
Returns:
If a session is found it is returned otherwise None is returned
"""
for session in self.sessions.itervalues():
if session["username"] == username:
return session
def _invalidate_session_id(self, session_id):
"""
Remove a session.
Args:
session_id: The session which should be removed
"""
try:
del self.sessions[session_id]
except KeyError:
pass
def _clear_expired_sessions(self):
"""
Clear all expired sessions
"""
for session_id in self.sessions.keys():
last_visit = self.sessions[session_id]["last_visit"]
if is_timestamp_expired(last_visit):
self._invalidate_session_id(session_id)
def _generate_session_id(self, username):
"""
Generates a new session id
Returns:
A new unique session_id
"""
return hmac.new(self.session_key, username + str(uuid.uuid4()), sha256).hexdigest()
def login(self, username, password):
"""
Handle the login procedure. If the login is successfull the session id is returned
otherwise a xmlrpclib.Fault exception is raised.
Args:
username: The username
password: The password
Returns:
A valid session id
Raises:
A xmlrpclib.Fault exception is raised
"""
# check username and password
if self.users.has_key(username):
if self.users[username] == password:
# Check if a session with the username exists
session = self._find_session_by_username(username)
if session:
if is_timestamp_expired(session["last_visit"]):
self._invalidate_session_id(session["session_id"])
else:
return session["session_id"]
# generate session id and save it
session_id = self._generate_session_id(username)
self.sessions[session_id] = {"username" : username,
"session_id": session_id,
"last_visit": get_timestamp()}
return session_id
raise Fault("unknown username or password", "Please check your username and password")
@require_login
def hello(self, session_id, name):
"""
Example method which requires a login
"""
if not name:
raise Fault("unknown recipient", "I need someone to greet!")
return "Hello, %s!" % name
def test():
server_address = (LISTEN_HOST, LISTEN_PORT)
server = SecureXMLRPCServer(server_address, SecureXMLRpcRequestHandler)
server.register_introspection_functions()
server.register_instance(XMLRPCHandler())
sa = server.socket.getsockname()
print "Serving HTTPS on", sa[0], "port", sa[1]
server.serve_forever()
if __name__ == "__main__":
test()
""" Testcode for a example client """
import time
def continue_xmlrpc_call(func, *args):
try:
ret = func(*args)
print ret
return ret
except xmlrpclib.Fault as e:
print e
server = xmlrpclib.ServerProxy("https://localhost:2048")
print server
print server.system.listMethods()
sid = continue_xmlrpc_call(server.login, "foo", "bar")
sid = continue_xmlrpc_call(server.login, "foo", "bar")
continue_xmlrpc_call(server.hello, sid, "World")
time.sleep(2)
continue_xmlrpc_call(server.hello, sid, "Invalid")
continue_xmlrpc_call(server.hello, "193", "")
最佳答案
@菲利普不错的设置。但是我发现登录功能有点奇怪。似乎任何知道登录用户名的人都可以获取 session ID,然后执行操作。如果您移动 session 代码以便仅在输入正确的用户名和密码时才返回现有 session ID,我看起来会更好。
# check username and password
if self.users.has_key(username):
if self.users[username] == password:
# Check if a session with the username exists
session = self._find_session_by_username(username)
if session:
if is_timestamp_expired(session["last_visit"]):
self._invalidate_session_id(session["session_id"])
else:
return session["session_id"]
# generate session id and save it
session_id = self._generate_session_id(username)
self.sessions[session_id] = {"username" : username,
"session_id": session_id,
"last_visit": get_timestamp()}
return session_id
关于session - 具有 session 处理功能的 Python 2 SSL xmlrpc 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17828637/
我在 Cloudflare 的域名服务器上有一个域名 example.com。该域指向我的专用服务器的 IP 地址,该服务器运行 CentOS/WHM/cPanel。该站点可访问 - 一切都很好。 我
我正在努力将 SSL 支持添加到我们现有的应用程序中,并已开始考虑向后兼容性。 与我读过的其他帖子不同的一个特殊情况是服务器可能不一定使用 SSL 代码更新。所以我将有一个 SSL 客户端连接到一个对
我有几个 https://*.rest-service.mydomain.com。随着服务数量的增加,我觉得管理 SSL 证书的成本很高。我为 *.mydomain.com 购买了通配符证书。 新添加
我的客户要求我在他的网站上做反向 ssl。但我是这个学期的新手。谁能帮我解决这个问题。 请描述或引用如何做。 最佳答案 查看 this wiki article . In the case of se
关闭。这个问题是opinion-based .它目前不接受答案。 想改进这个问题?更新问题,以便 editing this post 可以用事实和引用来回答它. 去年关闭。 Improve this
我连接到我的网络服务器上的存储库,但是当我尝试推送我的更改时,它显示:“错误 403:需要 ssl”,但在我的存储库设置中我已经激活了 ssl 选项。 有什么建议吗? 最佳答案 当您连接到存储库时,您
抱歉,如果这听起来像是转储问题,我已经阅读了很多关于 SSL 握手和 SSL 工作原理的文章和文档。我对一件事感到困惑,如果有人能澄清我就太好了。 我知道私钥要保密。但是我已经看到通过在请求中指定私钥
随着物联网越来越主流,越来越需要从硬件发送http请求。 一个主要问题是硬件微 Controller 无法发送 ssl 请求,但大多数服务器/网站/服务都在使用 ssl。 所以,问题是,有没有桥(一个
我有一个 ssl 页面,它还从非 ssl 站点下载头像。我能做些什么来隔离该内容,以便浏览器不会警告用户混合内容吗? 最佳答案 只是一个想法 - 或者: 尝试在头像网站上使用 ssl url,如有必要
我在 Digital Ocean droplet(使用 nginx)上设置了两个域。我已经在其中一个(domain1)中安装了一个 SSL 证书,并且那个证书一切正常。第二个域 (domain2) 不
我收到这个错误: Error frontend: 502 Bad gateway 99.110.244:443 2017/09/28 13:03:51 [error] 34080#34080: *10
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎与 help center 中定义的范围内的编程无关。 . 关闭 6 年前。 Improve
我遇到了一个问题,我正在构建一个 nginx 反向代理以定向到不同 url 路径上的多个微服务。 该系统完全基于 docker,因此开发和生产使用相同的环境。这在安装 SSL 时给我带来了问题,因为
所以我知道要求 SSL 证书和接受之间的根本区别,一个意味着您必须拥有 SSL 证书,另一个意味着您不需要。 在某个网页的 IIS 管理器中,我有以下设置: 我遇到的问题是,当我设置需要 SSL 证书
我今天才发现 .app 域名需要 SSL 证书。我购买它是为了将 DNS 重定向到已经设置了 SSL 证书的站点,所以我的问题是是否可以设置它? 我正在使用 Google Domains,在将合成临时
堆栈 : react ,NGINX 1.14.0,GUnicorn,Django 2.2.8,Python 3.6.9 错误 : 在浏览器:当 React 调用 Django API(当然是在请求头中
假设我在计算机上编辑主机文件以使 google.com 指向我的 VPS 服务器 IP,并且服务器具有通过 Apache 或 Nginx 配置的 google.com 的虚拟主机/服务器 block
我有一个场景,我正在处理用于 URL 路由的 IIS 网站配置。我已添加网站并在服务器上导入所需的证书。 我的情况是(我有多个网站 URL 和两个 SSL 证书 - 如下所示): qatest1.ab
我知道服务器发送的证书无法伪造(仍然存在 MD5 冲突,但成本高昂),但是伪造客户端又如何呢?在中间人攻击中:我们不能告诉服务器我们是合法客户端并从该服务器获取数据并对其进行操作,然后使用合法客户端公
我已通读相关问题,但无法完全找到我要查找的内容。我设置了一个名为“domain.com”的域,并创建了两个子域“client.domain.com”和“client-intern.domain.com
我是一名优秀的程序员,十分优秀!