gpt4 book ai didi

使用 ketama 的 Python memcache 一致性哈希

转载 作者:行者123 更新时间:2023-11-30 23:32:51 25 4
gpt4 key购买 nike

我有一个代码可以在运行时添加额外的内存缓存实例,但这会使我的 key 丢失。我知道有几个可用的库,例如confirm_hash、hash_ring,但我无法在我的代码中使用它们。我知道有 ketama 可用,但找不到它的 python 代码示例。

import random
import string
import memcache


class MemcacheClient(memcache.Client):
""" A memcache subclass. It currently allows you to add a new host at run
time.

Sadly, this truely messes with the our keys. I.E. Adding a host at runtime
effectively wipes our cache all together...Wonder why?
"""

def _get_server(self, key):
""" Current implementation of Memcache client
"""
return super(MemcacheClient, self)._get_server(key)

def add_server(self, server):
""" Adds a host at runtime to client
"""
# Create a new host entry
server = memcache._Host(
server, self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect
)
# Add this to our server choices
self.servers.append(server)
# Update our buckets
self.buckets.append(server)


def random_key(size):
""" Generates a random key
"""
return ''.join(random.choice(string.letters) for _ in range(size))


if __name__ == '__main__':
# We have 7 running memcached servers
servers = ['127.0.0.1:1121%d' % i for i in range(1,8)]
# We have 100 keys to split across our servers
keys = [random_key(10) for i in range(100)]
# Init our subclass
client = MemcacheClient(servers=servers)
# Distribute the keys on our servers
for key in keys:
client.set(key, 1)

# Check how many keys come back
valid_keys = client.get_multi(keys)
print '%s percent of keys matched' % ((len(valid_keys)/float(len(keys))) * 100)

# We add another server...and pow!
client.add_server('127.0.0.1:11219')
print 'Added new server'

valid_keys = client.get_multi(keys)
print '%s percent of keys stil matched' % ((len(valid_keys)/float(len(keys))) * 100)

最佳答案

嗯,基本上你必须重写 _get _server() 方法来更改服务器分发算法。

我在互联网上进行了一些搜索,并在谷歌上找到了这篇文章,amix.dk/blog/post/19367 ,这是 Amir Salihefendic 写的非常好的 Material ,对理解 ketama 一致性哈希算法的工作原理有很大帮助,并且还有他制作的一个名为 HashRing 的 Python 类上的 ketama 实现。

所以我基本上使用了他的类并对其进行了一些更改以满足 Memcached 客户端的需求。这些修改是对已弃用的 md5 模块的更改,以及用于为服务器生成 key 的字符串的更改:

key = self.gen_key('%s:%s' % (node, i))

至:

key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)

我还修复了一个错误,当算法在第一个循环中未找到服务器时,该错误会导致 get_nodes() 方法无限循环。

旧的 get_nodes() 方法(如果没有产生服务器,将进入无限循环)。

def get_nodes(self, string_key):
"""Given a string key it returns the nodes as a generator that can hold the key.

The generator is never ending and iterates through the ring
starting at the correct position.
"""
if not self.ring:
yield None, None

node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
yield self.ring[key]

while True:
for key in self._sorted_keys:
yield self.ring[key]

新的 get_nodes() 方法:

def get_nodes(self, string_key):
if not self.ring:
yield None, None

node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
if key in self.ring:
yield self.ring[key]

for key in self._sorted_keys[:pos]:
if key in self.ring:
yield self.ring[key]

我在add_node() 和remove_node() 方法上添加了一个新的forloop 范围,以考虑添加更多副本的服务器的权重。

旧方法:

for i in xrange(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
self.ring[key] = node
self._sorted_keys.append(key)

新方法:

for i in xrange(0, self.replicas):
for x in range(0, node.weight):
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)

if key not in self.ring:
self.ring[key] = node
self._sorted_keys.append(key)

上面的代码涉及 add_node() 方法,但一些想法适用于remove_node()。

好吧,也许我还做了一些其他的改变,我只是暂时不记得还有其他的了。这是合适的 HashRing 类:

from hashlib import md5    
class HashRing(object):

def __init__(self, nodes=None, replicas=3):
"""Manages a hash ring.

`nodes` is a list of objects that have a proper __str__ representation.
`replicas` indicates how many virtual points should be used pr. node,
replicas are required to improve the distribution.
"""
self.replicas = replicas

self.ring = dict()
self._sorted_keys = []

if nodes:
for node in nodes:
self.add_node(node)

def add_node(self, node):
"""Adds a `node` to the hash ring (including a number of replicas).
"""
for i in xrange(0, self.replicas):
"""This will ensure that a server with a bigger weight will have
more copies into the ring increasing it's probability to be retrieved.
"""
for x in range(0, node.weight):
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)

if key not in self.ring:
self.ring[key] = node
self._sorted_keys.append(key)

self._sorted_keys.sort()

def remove_node(self, node):
"""Removes `node` from the hash ring and its replicas.
"""
for i in xrange(0, self.replicas):
for x in range(node.weight):
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)

if key in self.ring:
del self.ring[key]
self._sorted_keys.remove(key)

def get_node(self, string_key):
"""
Given a string key a corresponding node in the hash ring is returned.

If the hash ring is empty, `None` is returned.
"""
return self.get_node_pos(string_key)[0]

def get_node_pos(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned
along with it's position in the ring.

If the hash ring is empty, (`None`, `None`) is returned.
"""
if not self.ring:
return None, None

key = self.gen_key(string_key)

nodes = self._sorted_keys
for i in xrange(0, len(nodes)):
node = nodes[i]
if key <= node:
return self.ring[node], i

return self.ring[nodes[0]], 0

def get_nodes(self, string_key):
"""Given a string key it returns the nodes as a generator that can hold
the key.

The generator is never ending and iterates through the ring
starting at the correct position.
"""
if not self.ring:
yield None, None

node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
if key in self.ring:
yield self.ring[key]

for key in self._sorted_keys[:pos]:
if key in self.ring:
yield self.ring[key]

@staticmethod
def gen_key(key):
"""Given a string key it returns a long value,
this long value represents a place on the hash ring.

md5 is currently used because it mixes well.
"""
m = md5()
m.update(key)
return long(m.hexdigest(), 16)

我对你的类做了一些更改,以便更灵活地决定何时使用 ketama 算法或默认的模数。

我注意到,在编写 add_server() 方法时,您忘记在将服务器附加到存储桶列表时考虑服务器的权重。

这就是新的 MemcacheClient 的样子:

from consistent_hash import HashRing


class MemcacheClient(memcache.Client):
""" A memcache subclass. It currently allows you to add a new host at run
time.
"""
available_algorithms = ['ketama', 'modulo']
hash_algorithm_index = 0

def __init__(self, hash_algorithm='ketama', *args, **kwargs):
super(MemcacheClient, self).__init__(*args, **kwargs)

if hash_algorithm in self.available_algorithms:
self.hash_algorithm_index = self.available_algorithms.index(
hash_algorithm)

if hash_algorithm == 'ketama':
self.consistent_hash_manager = HashRing(nodes=self.servers)
else:
self.consistent_hash_manager = None
else:
raise Exception(
"The algorithm \"%s\" is not implemented for this client. The "
"options are \"%s\""
"" % (hash_algorithm, " or ".join(self.available_algorithms))
)

def _get_server(self, key):
""" Returns the most likely server to hold the key
"""

if self.hash_algorithm == 'ketama':
""" Basic concept of the Implementation of ketama algorithm
e.g. ring = {100:server1, 110:server2, 120:server3, 140:server4}
If the hash of the current key is 105, it server will be the next
bigger integer in the ring which is 110 (server2)
If a server is added on position 108 the key will be now allocated
to it and not to server 110. Otherwise if the server on position
110 is removed the key will now belong to de server 120.
If there's no bigger integer position in the ring then the hash of
the key, it will take the first server from the ring.
"""
# The variable "servers" is the list of the servers in the ring
# starting from the next bigger integer to the hash of the key,
# till it finds the one that holds the key
servers_generator = self.consistent_hash_manager.get_nodes(key)
for server in servers_generator:
if server.connect():
#print server.address[1]
return server, key
return None, None

else:
return super(MemcacheClient, self)._get_server(key)

def add_server(self, server):
""" Adds a host at runtime to client
"""

# Uncomment this to protect the Client from adding a server in case
# there's no reliable consistent hash algorithm such as MODULO
"""
if not self.consistent_hash_manager:
raise Exception("The current consistent hash algorithm (\"%s\") is"
" not reliable for adding a new server"
"" % self.hash_algorithm)
"""

# Create a new host entry
server = memcache._Host(
server, self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect
)
# Add this to our server choices
self.servers.append(server)

"""This for statement will ensure that a server with a bigger weight
will have more copies into the buckets increasing it's probability to
be retrieved.
"""
for i in range(server.weight):
self.buckets.append(server)

# Adds this node to the circle
if self.consistent_hash_manager:
self.consistent_hash_manager.add_node(server)

def random_key(size):
""" Generates a random key
"""
return ''.join(random.choice(string.letters) for _ in range(size))


def run_consistent_hash_test(client_obj):
# We have 500 keys to split across our servers
keys = [random_key(100) for i in range(500)]

print(
"\n/////////// CONSISTENT HASH ALGORITHM \"%s\" //////////////"
"" % client_obj.hash_algorithm.upper()
)

print("\n->These are the %s servers:" % len(client_obj.servers))
str_servers = ""
for server in client_obj.servers:
str_servers += "%s:%s, " % (server.address[0], server.address[1])
print("******************************************************************")
print(str_servers)
print("******************************************************************")

# Clear all previous keys from memcache
client_obj.flush_all()

# Distribute the keys over the servers
for key in keys:
client_obj.set(key, 1)

print(
"\n%d keys distributed for %d server(s)\n"
"" % (len(keys), len(client_obj.servers))
)

# Check how many keys come back
valid_keys = client_obj.get_multi(keys)
print(
"%s percent of keys matched, before adding extra servers.\n" \
"" %((len(valid_keys) / float(len(keys))) * 100)
)

# Add 5 new extra servers
interval_extra_servers = range(19, 24)
extra_servers = ['127.0.0.1:112%d' % i for i in interval_extra_servers]
for server in extra_servers:
client_obj.add_server(server)

# Check how many keys come back after adding the extra servers
valid_keys = client_obj.get_multi(keys)
print (
"Added %d new server(s).\n%s percent of keys still matched" \
"" % (len(interval_extra_servers),
(len(valid_keys) / float(len(keys))) * 100)
)

print("\n***************************************************************"
"****\n")
if __name__ == '__main__':
# We have 8 running memcached servers
interval_servers = range(11, 19)
servers = ['127.0.0.1:112%d' % i for i in interval_servers]
"""
Init our subclass. The hash_algorithm paramether can be "modulo"<-
(default) or "ketama" (the new one).
"""
client = MemcacheClient(servers=servers, hash_algorithm='ketama')
run_consistent_hash_test(client)

如果您直接在终端上运行此类,它将显示正确的输出

关于使用 ketama 的 Python memcache 一致性哈希,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19193934/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com