文章目录
- 助记提要
- 10章 扩展Redis
- 10.1 扩展读性能
- 通过复制特性添加从服务器
- 10.2 扩展写性能和内存容量
- 扩展之前降低内存占用和优化性能
- 对分片连接的配置和使用
- 10.3 扩展搜索型查询
- 使从服务器可写
- 将搜索索引分片
- 基于SORT实现索引的分片搜索操作
- 基于有序集合实现索引的分片搜索操作
- 10.4 扩展社交网站
- 已发布状态消息的扩展
- 时间线的扩展
- 扩展关注和粉丝列表
助记提要
- 如何处理从服务器重同步问题;
- Redis故障转移的原理;
- 降低内存占用和性能优化的方法总结;
- 基于分片的连接装饰器;
- 搜索型查询如何扩展;
- 分片搜索操作的过程 2步;
- 使用分片扩展社交网站
10章 扩展Redis
主从复制 水平分片
10.1 扩展读性能
通过复制特性添加从服务器
在配置中加slaveof host port
,或者向服务器发送SLAVEOF host port
命令,就可以把服务器变为指定主服务器的从服务器。
- 注意:
- 从服务器的写入配置一般是关闭状态,对从服务器写入会引发错误。
- 服务器成为从服务器后,原本的数据将被清空。
- 从服务器重同步问题
主服务器同时向多个从服务器发送副本时,可能会把主服务器的带宽消耗尽。
可以使用主从复制树减少主服务器需要直接同步的从服务器数。
也能对网络连接进行压缩,减少每次需要传送的数据量。
使用带压缩的SSH隧道进行连接,其加密开销不大。过高的压缩级别会给低处理器的从服务器带来麻烦,最好控制在5级以下。
- 故障转移
Redis Sentinel服务器,是特殊模式的Redis服务器,可以监视一系列主服务器和它们的从服务器。
通过向主服务器发送PUBLISH、SUBCRIBE命令,向主、从服务器发送PING命令,Sentinel进程可以自主识别可用的从服务器和其他Sentinel。
主服务器失效的时候,监视该主服务器的所有Sentinel会基于彼此公有的信息选出一个Sentinel,并从现有从服务器中选出一个作为新的主服务器。被选中的Sentinel会让剩余的从服务器去复制这个新的主服务器。
10.2 扩展写性能和内存容量
达到服务器性能瓶颈时,需要把数据分片到多台机器组成的群组里面。
扩展之前降低内存占用和优化性能
- 优化程序,降低需要读取的数据量;
- 把不相关的功能迁移到其他服务器;
- 一些统计型的数据,在写入之前,先做聚合计算;
- 使用锁代替可能限制速度的WATCH/MULTI/EXEC事务,或者使用Lua及脚本;
- AOF持久化时,使用合适的配置,避免频繁的硬盘写入;
对分片连接的配置和使用
根据名字进行创建或重用Redis连接
def get_redis_connection(component, wait=1):key = 'config:redis:' + componentold_config = CONFIGS.get(key, object())# 获取新配置,config_connection是放有配置信息的Redis的连接config = get_config(config_connection, 'redis', component, wait)# 如果新旧配置不同,就创建一个新的连接if config != old_config:REDIS_CONNECTIONS[key] = redis.Redis(**config)return REDIS_CONNECTIONS.get(key)
根据分片信息获取该分片的连接
def get_sharded_connection(component, key, shard_count, wait=1):# shard格式:组件名:分片id,分片id由key计算得到shard = shard_key(component, 'x'+str(key), shard_count, 2)return get_redis_connection(shard, wait)
支持分片功能的连接装饰器
def sharded_connection(component, shard_count, wait=1):# 装饰器接收组件名和预期分片数为参数def wrapper(function):@functools.wraps(function)def call(key, *args, **kwargs):# 获取分片连接conn = get_sharded_connection(component, key, shard_count, wait)# 实际调用被装饰的函数return function(conn, key, *args, **kwargs)return callreturn wrapper
在分片环境下统计唯一访客数
# 把函数分在16个机器上执行,判定访客是否唯一的集合在每台机器的多个数据库键下
@sharded_connection('unique', 16)
def count_visit(conn, session_id):today = date.today()key = 'unique:%s' % today.isoformat()conn2, expected = get_expected(key, today)id = int(session_id.replace('-', '')[:15], 16)if shard_sadd(conn, key, id, expected, SHARD_SIZE):# 非分片连接对唯一计数器执行自增conn2.incr(key)# get_expected函数使用非分片的连接
@redis_connection('unique')
def get_excepted(conn, key, today):...# 返回非分片连接,使count_visit可以对唯一计数器执行自增操作retrun conn, EXPECTED[key]
使用SETBIT、BITCOUNT和BITOP对二进制位数组进行索引查找,可以在不分片的情况下实现唯一访问计数器。
Redis系统是单线程设计,在多核高带宽的机器上,可以运行多个Redis服务器。只要对这些服务器进行配置,分配不同的端口、不同的快照配置即可。
10.3 扩展搜索型查询
Redis执行的查询不只是取值和写入,对于复杂的查询操作,仅仅对数据进行分片达不到扩展的目的。
搜索型查询就是这样的复杂查询。
使从服务器可写
搜索型查询需要执行SUNIONSTORE、SINTERSTORE、SDIFFSTORE、ZINTERSTORE和ZUNIONSTORE等命令,这些命令都需要对Redis写入。而从服务器默认是只读状态,无法写入。
Redis配置中有个选项slave-read-only
,默认为yes,改为no就能在从服务器上执行搜索查询了。不过需要注意重同步的问题。
搜索查询的结果只是被缓存在了执行过查询的从服务器上,如果需要重用被缓存的结果,就需要执行“定期持久化”操作,比如让客户端向相同的Web服务器发请求、Web服务器又向相同的Redis服务器发请求。
将搜索索引分片
搜索索引的大小总会随着时间不断地增长,
搜索查询分片的第一步,需要先对搜索索引进行分片,并且使被索引的每个文档,同一个文档的所有数据都分在同一个分片里。
前面使用的建索引的方法index_document(),会接收连接对象。可以直接传入分片连接,使其把索引建在分片上。或者使用自动分片装饰器操作。
基于SORT实现索引的分片搜索操作
索引分片后,只需要对分片进行查询就可以取得搜索结果。
分片搜索操作可以分为两步:在所有分片上执行搜索查询操作;然后将各个分片的搜索结果进行整合。
执行查询的方法,对基于SORT实现的索引和基于有序集合实现的索引不一样。SORT实现的索引需要统一排序的数据类型。
在单分片上执行的搜索函数
# 函数的参数和之前非分片的search_and_sort相同
def search_get_values(conn, query, id=None, ttl=300, sort="-updated", start=0, num=20):# 搜索和排序,取从0开始到start+num位的排序结果count, docids, id = search_and_sort(conn, query, id, ttl, sort, 0, start+num)key = 'kb:doc:%s'sort = sort.lstrip('-')# 按排序的顺序取文档的更新时间pipe = conn.pipeline(False)for docid in docids:pipe.hget(key % docid, sort)sort_column = pipe.execute()# 文档id对应文档的更新时间data_pairs = zip(docids, sort_column)return count, data_pairs, id
在取后面页的结果时,如取第91-100条结果,由于程序只知道需要比它第90条大的数据,并不知道整合后的这条数据在各个分片上的起始位置start,因此程序在每个分片上都必须取前面的全部100条数据。
在所有分片上执行上述函数
def get_shard_results(component, shards, query, ids=None, ttl=300, sort="-updated", start=0, num=20, wait=1):# component是组件名称,shards是分片数count = 0data = []# 如果传入已被缓存的ids,就使用,否则重新执行查询ids = ids or shards * [None]for shard in range(shards):# 获取或创建一个分片连接conn = get_redis_connection('%s:%s' % (component, shard), wait)# 获取搜索结果,包括选到的文档的更新时间,用于合并c, d, i = search_get_values(conn, query, ids[shard], ttl, sort, start, num)# 合并计算结果count += cdata.extend(d)ids[shard] = ireturn count, data, ids
这个操作可以利用Python的线程并行执行。
对分片搜索结果整合和选取
def to_numeric_key(data):try:# Decimal类型可以合理地转换整数和浮点数,对缺失值和非数值返回0return Decimal(data[1] or '0')except:return Decimal('0')def to_string_key(data):# 总是返回字符串return data[1] or ''def search_shards(component, shards, query, ids=None, ttl=300, sort="-updated", start=0, num=20, wait=1):# 获取所有分片的搜索结果count, data, ids = get_shard_results(component, shards, query, ids, ttl, sort, start, num, wait)# 整理排序所需的参数reversed = sort.startswith("-")sort = sort.strip('-')key = to_numeric_keyif sort not in ('updated', 'id', 'created'):key = to_string_key# 对搜索结果做排序data.sort(key=key, reversed=reversed)results = []# 仅取指定的页for docid, score in data[start:start+num]:results.append(docid)# ids内是各个分片查询的缓存idreturn count, results, ids
Decimal可以用且更少的代码得到相同的排序,且能正确处理无限大小的数字。
排序时需要考虑缺失值,并把类型统一。
基于有序集合实现索引的分片搜索操作
在单个分片上执行的搜索函数
def search_get_zset_values(conn, query, id=None, ttl=300, update=-1, vote=0, start=0, num=20, desc=True):# 取得搜索结果的文档数和idcount, r, id = search_and_zsort(conn, query, id, ttl, update, vote, 0, start + num - 1, desc)# 获取搜索结果的分值if desc:data = conn.zrevrange(id, 0, start + num - 1, withscores=True)else:data = conn.zrange(id, 0, start + num - 1, withscores=True)return count, data, id
这里有序集合的分值是复合索引,取后续页的时候也无法确定起始位置,因此zrevrange也需返回前面全部搜索结果。可以使用zrevrangebyscore,直接用分值比较,但是函数需要多加参数,变得复杂。
整合分片结果
def search_shards_zset(component, shards, query, ids=None, ttl=300, update=-1, vote=0, start=0, num=20, desc=True, wait=1):count = 0data = []ids = ids or shards * [None]for shard in range(shards):# 获取分片连接conn = get_redis_connection('%s:%s' % (component, shard), wait)c, d, i = search_get_zset_values(conn, query, ids[shard], ttl, update, vote, start, num, desc)# 合并结果count += cdata.extend(d)ids[shard] = idef key(result):# 仅返回与分值有关的信息return result[1]# 排序结果data.sort(key=key, reversed=desc)results = []# 从结果里取出文档id,把分值丢弃for docid, score in data[start:start+num]:results.append(docid)return count, results, ids
10.4 扩展社交网站
如何通过分片对社交网站进行扩展。
扩展的第一步就是找出经常被读取和写入的数据,并且考虑把常用和不常用数据分开。
社交网站的数据主要三种:状态消息、时间线、关注和粉丝列表。
已发布状态消息的扩展
状态消息基于散列存储,所以可以基于散列所在的键,将其分片到由多个散列组成的集群里面。
也可以在Redis中仅存储最新发布的消息,而旧的较少读取的消息存在硬盘型数据库中。
添加从服务器只是读写分离,减轻主服务器的读压力。而将数据分片到多个服务器上面,可以降低每个服务器的写压力。
时间线的扩展
主页时间线和列表时间线都比较短,所以不需要对时间线中的内容做分片,而是根据时间线的键名把它们分别存到不同的分片上。
个人时间线,大部分人发布的消息很少,但是少数用户会发布极多的消息。实用的做法是限制每个用户的时间线最多只存2万消息,旧的信息会删除或隐藏。
sharded_timelines = KeyShardedConnection('timelines', 8)def follow_user(conn, uid, other_uid):fkey1 = 'following:%s' % uidfkey2 = 'followers:%s' % other_uidif conn.zscore(fkey1, other_uid):print('already followed', uid, other_uid)return Nonenow = time.time()pipeline = conn.pipeline(True)pipeline.zadd(fkey1, other_uid, now)pipeline.zadd(fkey2, uid, now)pipeline.zcard(fkey1)pipeline.zcard(fkey2)following, followers = pipeline.execute()[-2:]pipeline.hset('user:%s' % uid, 'following', following)pipeline.hset('user:%s' % other_uid, 'followers', followers)pipeline.execute()# 从被关注者的个人时间线取最新的动态pkey = 'profile:%s' % other_uidstatus_and_score = sharded_timelines[pkey].zrevrange(pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)if status_and_score:hkey = 'home:%s' % uid# 根据被分片的键获取连接pipe = sharded_timelines[hkey].pipeline(True)# 将关注者的动态加到分片的主页时间线,并修剪pipe.zadd(hkey, **dict(status_and_score))pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)pipe.execute()return True
通用的分片连接类,需要指定组件和分片数,就可以通过被访问的键创建分片连接。
class KeyShardedConnection(object):def __init__(self, component, shards):# 组件名和分片数self.component = componentself.shards = shardsdef __getitem__(self, key):# 根据要查找的键,获取其所在分片的连接return get_sharded_connection(self.component, key, self.shards)
扩展关注和粉丝列表
用户关注人数可以设人数限制,但是用户的粉丝数无法设限。粉丝列表过大的话,还是需要把它们所在的有序集合分到多个分片上。
前面实现的关注动作是按数据的键做的分片,但是现在需要按关注数据来分片。
同时,粉丝和被关注者放在同一个分片上,可以显著减少创建和调用的连接数。程序使用粉丝和被关注者的两个id一起当做查找分片的参数。
对粉丝有序集合和被关注者有序集合做分片后的关注操作
sharded_timelines = KeyShardedConnection('timelines', 8)
sharded_followers = KeyShardedConnection('followers', 16)def follow_user(conn, uid, other_uid):fkey1 = 'following:%s' % uidfkey2 = 'followers:%s' % other_uid# 根据两个用户的id获取连接对象sconn = sharded_followers[uid, other_uid]if sconn.zscore(fkey1, other_uid):return Nonenow = time.time()spipe = sconn.pipeline(True)spipe.zadd(fkey1, other_uid, now)spipe.zadd(fkey2, uid, now)following, followers = spipe.execute()# 更新关注数和粉丝数pipeline = conn.pipeline(True)pipeline.hincrby('user:%s' % uid, 'following', int(following))pipeline.hincrby('user:%s' % other_uid, 'followers', int(followers))pipeline.execute()pkey = 'profile:%s' % other_uidstatus_and_score = sharded_timelines[pkey].zrevrange(pkey, 0, HOME_TIMELINE_SIZE-1, withscores=True)if status_and_score:hkey = 'home:%s' % uidpipe = sharded_timelines[hkey].pipeline(True)pipe.zadd(hkey, **dict(status_and_score))pipe.zremrangebyrank(hkey, 0, -HOME_TIMELINE_SIZE-1)pipe.execute()return True
前面的通过键获取分片连接的类,改为通过id对获取连接。
calss KeyDataShardedConnection(object):def __init__(self, component, shards):...def __getitem__(self, ids):# 确保传入的id都是整数id1, id2 = map(int, ids)# 确保第一个id较小if id2 < id1:id1, id2 = id2, id1# 两个id构建一个键key = "%s:%s" % (id1, id2)# 使用构建的键获取分片连接return get_sharded_connection(self.component, key, self.shards)
较小的id放在前面,这样用户无论何时以什么顺序访问两个id,得到的分片总是一样的。
通过分片连接器,可以对其他的有序集合操作进行更新。其他操作的好多处都用到了ZRANGEBYSCORE命令获取粉丝,因此先做一个分片版本的ZRANGEBYSCORE。
def sharded_zrangebyscore(component, shards, key, min, max, num):data = []for shard in range(shards):# 获取分片连接conn = get_redis_connection("%s:%s" % (component, shard))# 从分片上取出数据data.extend(conn.zrangebyscore(key, min, max, start=0, num=num, withscores=True))def key(pair):return pair[1], pair[0]# 先基于分值排序,再基于成员排序data.sort(key=key)return data[:num]
可以根据固定分值进行排序的时候,直接从分片上取大于某个分值的一页数据就可以。
对动态广播函数做更新
def syndicate_status(uid, post, start=0, on_lists=False):root = 'followers'key = 'followers:%s' % uidbase = 'home:%s'if on_lists:root = 'list:out'key = 'list:out:%s' % uidbase = 'list:statuses:%s'# 找出关注者followers = sharded_zrangebyscore(root, sharded_followers.shards, key, start, 'inf', POSTS_PER_PASS)# 对粉丝的主页时间线的键按分片进行分组to_send = defaultdict(list)for follower, start in followers:timeline = base % followershard = shard_key('timelines', timeline, sharded_timelines.shards, 2)to_send[shard].append(timeline)for timelines in to_send.itervalues():pipe = sharded_timelines[timelines[0]].pipeline(False)for timeline in timelines:# 添加新消息,移除过旧的消息pipe.zadd(timeline, **post)pipe.zremrangebyrank(timeline, 0, -HOME_TIMELINE_SIZE-1)pipe.execute()conn = redis.Redis()if len(followers) >= POSTS_PER_PASS:execute_later(conn, 'default', 'syndicate_status', [uid, post, start, on_lists])elif not on_lists:execute_later(conn, 'default', 'syndicate_status', [uid, post, 0, on_lists])