源码版本 Redis 6.0.0

内存淘汰是什么?什么时候内存淘汰

我们知道,当某个key被设置了过期时间之后,客户端每次对该key的访问(读写)都会事先检测该key是否过期,如果过期就直接删除;但有一些键只访问一次,因此需要主动删除,默认情况下redis每秒检测10次,检测的对象是所有设置了过期时间的键集合,每次从这个集合中随机检测20个键查看他们是否过期,如果过期就直接删除。

内存淘汰虽然也是删除key,但是与我们设置的过期key删除不同。

内存淘汰是说,如果redis已使用内存达到预设的limit值(你可以当做内存满了),如果想要继续存储新的key,那么必然要淘汰一些key来释放内存空间。具体淘汰哪些key由不同的淘汰策来决定。

由于需要淘汰掉key,所以此时redis不适合用作key-value数据库了,开启了内存淘汰的redis一般用于缓存中,因此很多人也把他叫缓存淘汰和缓存淘汰策略。

又因为内存淘汰的触发时机是内存使用达到阈值,为了存储新的key然后淘汰旧的key,因此也有人把内存淘汰策略叫做缓存替换策略

内存淘汰策略

Redis 4.0 之前一共实现了 6 种内存淘汰策略,在 4.0 之后,又增加了 2 种策略。

我们可以按照是否会进行数据淘汰把它们分成两类:

  • 不进行数据淘汰的策略,只有 noeviction 这一种。
  • 会进行淘汰的 7 种其他策略。

会进行淘汰的 7 种策略,我们可以再进一步根据淘汰候选数据集的范围把它们分成两类:

  • 在设置了过期时间的数据中进行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 后新增)四种。
  • 在所有数据范围内进行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 后新增)三种。我把这 8 种策略的分类,画到了一张图里:

这8种策略的简单描述都可以在redis.conf文件中找到

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.

翻译下:

# MAXMEMORY POLICY:当达到最大内存时,Redis 将如何选择要删除的内容
# 到达了。 您可以从以下行为中选择一种:
#
# volatile-lru -> Evict 使用近似 LRU,只有设置了过期时间的键。
# allkeys-lru -> 使用近似 LRU 驱逐所有键。
# volatile-lfu -> 使用近似 LFU 驱逐,只有设置了过期时间的键。
# allkeys-lfu -> 使用近似 LFU 驱逐所有键。
# volatile-random -> 随机删除设置了过期时间的key。
# allkeys-random -> 随机删除一个键,任何键。
# volatile-ttl -> 删除过期时间最近的key(次TTL)
# noeviction -> 不要驱逐任何东西,只是在写操作时返回一个错误。

通过redis.conf文件中的内容我们可以知道默认策略是noeviction

# The default is:
#
# maxmemory-policy noeviction

Redis中的LRU淘汰算法

限于篇幅,本文主要以LRU淘汰算法展开讲解,其他算法流程大同小异,可在我的redis源码中文注释项目中查看

不了解LRU算法基础的可以先去百度了解下,很简单

这里就不讲LRU算法的基本知识了,我们来看Redis的LRU算法的原理是怎么样的。

其实,LRU 算法背后的想法非常朴素:它认为刚刚被访问的数据,肯定还会被再次访问,所以就把它放在 MRU 端;长久不访问的数据,肯定就不会再被访问了,所以就让它逐渐后移到 LRU 端,在缓存满时,就优先删除它。

不过,LRU 算法在实际实现时,需要用链表管理所有的缓存数据,这会带来额外的空间开销。而且,当有数据被访问时,需要在链表上把该数据移动到 MRU 端,如果有大量数据被访问,就会带来很多链表移动操作,会很耗时,进而会降低 Redis 缓存性能。

所以,在 Redis 中,LRU 算法被做了简化,以减轻数据淘汰对缓存性能的影响。具体来说,Redis 默认会记录每个数据的最近一次访问的时间戳(由键值对数据结构 RedisObject 中的 lru 字段记录)。然后,Redis 在决定淘汰的数据时,第一次会随机选出 N 个数据,把它们作为一个候选集合。接下来,Redis 会比较这 N 个数据的 lru 字段,把 lru 字段值最小的数据从缓存中淘汰出去。

Redis 提供了一个配置参数 maxmemory-samples,这个参数就是 Redis 选出的数据个数 N。例如,我们执行如下命令,可以让 Redis 选出 100 个数据作为候选数据集:

CONFIG SET maxmemory-samples 100

redis.conf文件中默认配置是5


# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5

当需要再次淘汰数据时,Redis 需要挑选数据进入第一次淘汰时创建的候选集合。这儿的挑选标准是:能进入候选集合的数据的 lru 字段值必须小于候选集合中最小的 lru 值。当有新数据进入候选数据集后,如果候选数据集中的数据个数达到了 maxmemory-samples,Redis 就把候选数据集中 lru 字段值最小的数据淘汰出去。

这样一来,Redis 缓存不用为所有的数据维护一个大链表,也不用在每次数据访问时都移动链表项,提升了缓存的性能。

源码剖析

源码中文注释可以在我的 github 上 的 Redis 6.0 中文注释项目里看到,欢迎访问

第一步:什么时候开始淘汰key

配置读取
struct redisServer {
    // 可使用的最大内存字节数
    unsigned long long maxmemory;   /* Max number of memory bytes to use */
    // key 淘汰策略
    int maxmemory_policy;           /* Policy for key eviction */
    // 随机抽样精度
    int maxmemory_samples;          /* Precision of random sampling */
}
检查时机

执行内存淘汰的方法是freeMemoryIfNeeded,实现在evict.c中.

注意redis 3.0版本中该方法在redis.c

这个方法主要的作用是检查内存状态,如果内存达到阈值就进行内存淘汰,释放内存空间直至到阈值以下。

打开redis源码可以很轻易搜索到该方法是被freeMemoryIfNeededAndSafe调用的


/* 
 * 这是 freeMemoryIfNeeded() 的包装器,只有在现在有条件安全地执行时才真正调用该函数:
 * - 超时条件下不能有脚本。
 * - 现在没有加载数据。
 */
int freeMemoryIfNeededAndSafe(void) {
    if (server.lua_timedout || server.loading) return C_OK;
    return freeMemoryIfNeeded();
}

那么这个方法又是被谁调用的呢,有两个地方

  • updateMaxmemory
  • processCommand

第一个看名字是在更新Maxmemory时调用的,本文不讲,我们主要来看第二个方法

在处理命令的时候,会调用server.c中的processCommand方法

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * 
 * 如果这个函数被调用,我们已经读取了整个命令,参数在客户端 argv/argc 字段中。
 * 
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *  
 * processCommand() 执行命令或准备服务器以从客户端进行批量读取。
 */
int processCommand(client *c) {
    // 省略...
    // 如果配置了 server.maxmemory 并且 lua脚本没有在超时条件下运行
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
        // 省略...
    }
    // 省略...
}
getMaxmemoryState

紧接着会在freeMemoryIfNeeded方法中检查内存状态来判断是否需要内存淘汰

/* 
 * 根据当前的“maxmemory”设置,定期调用此函数以查看是否有可用内存。 如果我们超过内存限制,该函数将尝试释放一些内存以返回低于限制。
 *
 * 如果我们低于内存限制或超过限制但尝试释放内存成功,该函数将返回 C_OK。
 * 否则,如果我们超过了内存限制,但没有足够的内存被释放以返回低于限制,则该函数返回 C_ERR。
 */
int freeMemoryIfNeeded(void) {
    // 默认情况下,从节点应该忽略maxmemory指令,仅仅做从节点该做的事情就好
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
    // 当客户端暂停时,数据集应该是静态的,不仅来自客户端的 POV 无法写入,还有来自POV过期和驱逐key也无法执行。
    if (clientsArePaused()) return C_OK;
    // 检查内存状态,有没有超出限制,如果有,会计算需要释放的内存和已使用内存
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;
}

getMaxmemoryState 方法也在evict.c中:

/* 从maxmemory指令的角度获取内存状态:
 * 如果使用的内存低于 maxmemory 设置,则返回 C_OK。
 * 否则,如果超过内存限制,函数返回
 * C_ERR。
 *
 * 该函数可能会通过引用返回附加信息,仅当
 * 指向相应参数的指针不为 NULL。某些字段是
 * 仅在返回 C_ERR 时填充:
 *
 * 'total' 使用的总字节数。
 *(为 C_ERR 和 C_OK 填充)
 *
 * 'logical' 使用的内存量减去从/AOF缓冲区。
 *(返回 C_ERR 时填充)
 *
 * 'tofree' 为了回到内存限制应该释放的内存量
 * 
 *(返回 C_ERR 时填充)
 *
 * 'level' 这通常范围从 0 到 1,并报告数量
 * 当前使用的内存。如果我们超过了内存限制可能会 > 1。
 *(为 C_ERR 和 C_OK 填充)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)

通过以上源码,我们能够知道什么时候会内存淘汰。

下一步我们来看看会淘汰哪些key

第二步:淘汰哪些key

freeMemoryIfNeeded

文章正文中我们提到过内存淘汰策略,但是第一步并没有涉及到具体的策略。那么不同的策略是在哪里实现的呢?这是在freeMemoryIfNeeded方法中判断的,顺便这里先带你了解下freeMemoryIfNeeded方法的骨架


int freeMemoryIfNeeded(void) {
    
    /*
     * 
     * mem_reported 已使用内存
     * mem_tofree 需要释放的内存
     * mem_freed 已释放内存
     */
    size_t mem_reported, mem_tofree, mem_freed;
    
    // 检查内存状态,有没有超出限制,如果有,会计算需要释放的内存和已使用内存
    // 这个方法不但会计算内存,还会赋值 mem_reported mem_freed
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;
    // 初始化已释放内存的字节数为 0
    mem_freed = 0;
    
    // 不进行内存淘汰
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */
        
    // 根据 maxmemory 策略,
    // 遍历字典,释放内存并记录被释放内存的字节数
    while (mem_freed < mem_tofree) {
        // 最佳淘汰key
        sds bestkey = NULL;
        // LRU策略或者LFU策略或者VOLATILE_TTL策略
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            // 不同的策略找bestKey
        }
        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            // 不同的策略找bestKey
        }
        
        // 最后选定的要删除的key
        if (bestkey) {
            // 在这里删除key
        }
    }
    
}

上面的源码省略了很多东西,但我相信你已经知道这个方法在做什么了,也应该知道不同的策略其实就是在找不同的key。我们重点分析LRU策略下的bestKey选择。

redis索引

这个时候要去redis数据库拿东西了,查找的话还是需要索引的,因此我先向你简单罗列下Redis索引相关的数据结构,很简单,不做文字表述了

server.h

// Redis 数据库。 有多个数据库由从 0(默认数据库)到最大配置数据库的整数标识。 数据库编号是结构中的“id”字段。
typedef struct redisDb {
    /* 数据库键空间,保存着数据库中的所有键值对 */
    dict *dict;               /* The keyspace for this DB */
    // 设置超时时间的key集合
    dict *expires;              /* Timeout of keys with a timeout set */
    // 客户端等待数据的key (BLPOP)
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    // 收到 PUSH 被阻塞的key
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    // 数据库的键的平均 TTL ,统计信息
    long long avg_ttl;          /* Average TTL, just for stats */
    // 
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    // 尝试逐一进行碎片整理的键名列表。
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

注意 redisDb 有两个重要的dict,*dict是主字典,存储所有的键集合, *expires存储设置了超时时间的集合,后面根据不同的淘汰策略就可以从不同的数据集拿数据

dict.h

/*
 * 字典
 */
typedef struct dict {
    // 类型特定函数
    dictType *type;
    // 私有数据
    void *privdata;
    // 哈希表
    dictht ht[2];
    // rehash 索引
    // 当 rehash 不在进行时,值为 -1
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    // 当前运行的迭代器数量
    unsigned long iterators; /* number of iterators currently running */
} dict;

每个dict又存储了2个dictht,为什么是两个呢?用来实现渐进式 rehash

/* 哈希表
 * 每个字典都使用两个哈希表,从而实现渐进式 rehash 。
 */
typedef struct dictht {
    // 哈希表数组
    dictEntry **table;
    // 哈希表大小
    unsigned long size;
    // 哈希表大小掩码,用于计算索引值
    // 总是等于 size - 1
    unsigned long sizemask;
    // 该哈希表已有节点的数量
    unsigned long used;
} dictht;

到了哈希表了,这个是真正存储元素的地方了,再来看哈希表元素长什么样

/*
 * 哈希表节点
 */
typedef struct dictEntry {
    // 键
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下个哈希表节点,形成链表
    struct dictEntry *next;
} dictEntry;

以上就是redis索引相关的数据结构,我们可以看到redis索引是通过哈希表来实现的。上面说的是索引的存储,那么我们放入redis的value存储在哪呢?

server.h

typedef struct redisObject {
    // 类型
    unsigned type:4;

    // 编码
    unsigned encoding:4;

    // 对象最后一次被访问的时间
    unsigned lru:LRU_BITS;

    // 引用计数
    int refcount;

    // 指向实际值的指针
    void *ptr;
} robj;

这是我们的值对象数据结构定义,type就是具体的值的数据类型了,lru是对象最后一次被访问的时间,由于只有24位,无法记录完整的时间,因此只记录了unix time的低24位,24 bits数据要溢出的话需要194天,而缓存的数据更新非常频繁,已经足够了。详细的关于lru精度的问题可以查看源码。到这里你应该对redis的结构有了基本了解。下面在来看如何挑选bestkey更简单一些。

淘汰池
// 最佳淘汰key
sds bestkey = NULL;
// key所属db
int bestdbid;
// Redis数据库
redisDb *db;
// 字典
dict *dict;
// 哈希表节点
dictEntry *de;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
    server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
    // 淘汰池
    struct evictionPoolEntry *pool = EvictionPoolLRU;

    while(bestkey == NULL) {
        unsigned long total_keys = 0, keys;

        // 从每个数据库抽样key填充淘汰池
        for (i = 0; i < server.dbnum; i++) {
            db = server.db+i;
            // db->dict: 数据库所有key集合
            // db->expires: 数据中设置过期时间的key集合
            // 判断淘汰策略是否是针对所有键的
            dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                    db->dict : db->expires;
            // 计算字典元素数量,不为0才可以挑选key
            if ((keys = dictSize(dict)) != 0) {
                // 填充淘汰池,四个参数分别为 dbid,候选集合,主字典集合,淘汰池
                // 填充完的淘汰池内部是有序的,按空闲时间升序
                evictionPoolPopulate(i, dict, db->dict, pool);
                // 已遍历检测过的key数量
                total_keys += keys;
            }
        }
        // 如果 total_keys = 0,没有要淘汰的key(redis没有key或者没有设置过期时间的key),break
        if (!total_keys) break; /* No keys to evict. */

        // 遍历淘汰池,从淘汰池末尾(空闲时间最长)开始向前迭代
        for (k = EVPOOL_SIZE-1; k >= 0; k--) {
            if (pool[k].key == NULL) continue;
            // 获取当前key所属的dbid
            bestdbid = pool[k].dbid;

            if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                // 如果淘汰策略针对所有key,从 redisDb.dict 中取数据,redisDb.dict 指向所有的键值集合
                de = dictFind(server.db[pool[k].dbid].dict,
                    pool[k].key);
            } else { // 如果淘汰策略不是针对所有key,从 redisDb.expires 中取数据,redisDb.expires 指向已过期键值集合
                de = dictFind(server.db[pool[k].dbid].expires,
                    pool[k].key);
            }

           
            if (pool[k].key != pool[k].cached)
                sdsfree(pool[k].key);
            // 从池中删除这个key,不管这个key还在不在
            pool[k].key = NULL;
            pool[k].idle = 0;

            // 如果这个节点存在,就跳出这个循环,否则尝试下一个元素。
            // 这个节点可能已经不存在了,比如到了过期时间被删除了
            if (de) {
                // de是key所在哈希表节点,bestkey是 key 名
                bestkey = dictGetKey(de);
                break;
            } else {
                /* Ghost... Iterate again. */
            }
        }
    }
}

上面的源码注释已经很清晰了,大概有如下几个步骤:

  1. 初始化淘汰池
  2. 遍历数据库
  3. 根据淘汰策略是所有key还是过期key,从而选择不同的数据集(redisDb.expires or redisDb.dict)
  4. 调用evictionPoolPopulate方法,传入上一步选择的数据集,填充淘汰池数据并排好序
  5. 从淘汰池中选取要淘汰的key(空闲时间最长的key),并且删除淘汰池中该key的引用。如果该key已经没了,那么选取淘汰池中次优的key,直到找到一个还存在的key

核心步骤当然是evictionPoolPopulate方法,我们来来看看淘汰池的元素结构。

evict.c

// 淘汰池大小
#define EVPOOL_SIZE 16
// 淘汰池缓存的最大sds大小
#define EVPOOL_CACHED_SDS_SIZE 255

struct evictionPoolEntry {
    // 对象空闲时间
    // 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。
    unsigned long long idle;    /* Object idle time (inverse frequency for LFU) */
    sds key;                    /* Key name. */
    // 用来存储一个sds对象留待复用,注意我们要复用的是sds的内存空间,只需关注cached的长度(决定是否可以复用),无需关注他的内容
    sds cached;                 /* Cached SDS object for key name. */
    int dbid;                   /* Key DB number. */
};

淘汰池不只可以给LRU使用,你可以在后面的源码中看到,LFU以及TTL也会使用淘汰池

知道了淘汰池长什么样子我相信下面的代码你就好理解了,无非是按idle字段升序排序

evict.c

/* 
 * 这是 freeMemoryIfNeeded() 的辅助函数,用于在每次我们想要key过期时用一些条目填充 evictionPool。
 * 
 * 添加空闲时间小于当前所有key空闲时间的key,如果池是空的则key会一直被添加
 * 
 * 我们按升序将键依次插入,因此空闲时间较小的键在左侧,而空闲时间较长的键在右侧。
 */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    // 初始化抽样集合,大小为 server.maxmemory_samples
    dictEntry *samples[server.maxmemory_samples];
    // 此函数对字典进行采样以从随机位置返回一些键
    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        // 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。
        unsigned long long idle;
        // key
        sds key;
        // 值对象
        robj *o;
        // 哈希表节点
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* 如果我们采样的字典不是主字典(而是过期的字典),我们需要在键字典中再次查找键以获得值对象。*/
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* 根据策略计算空闲时间。 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。*/
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            // 给定一个对象,使用近似的 LRU 算法返回未请求过该对象的最小毫秒数
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用这个池子
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            // 在这种情况下,越早过期越好。
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* 将元素插入池中*/
        k = 0;
        // 遍历淘汰池,从左边开始,找到第一个空桶或者第一个空闲时间大于等于待选元素的桶,k是该元素的坐标
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* 如果元素小于我们拥有的最差元素并且没有空桶,则无法插入。
             * 
             * key == 0 说明上面的while循环一次也没有进入
             * 要么第一个元素就是空的,要么所有已有元素的空闲时间都大于等于待插入元素的空闲时间(待插入元素比已有所有元素都优质)
             * 又因为数组最后一个key不为空,因为是从左边开始插入的,所以排除了第一个元素是空的
             */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* 插入空桶,插入前无需设置 */
        } else {
            /* 插入中间,现在 k 指向比要插入的元素空闲时间大的第一个元素 */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* 数组末尾有空桶,将所有元素从 k 向右移动到末尾。*/
                /* 覆盖前保存 SDS */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                // 注意这里不设置 pool[k], 只是给 pool[k] 腾位置
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                // 转移 cached (sds对象)
                pool[k].cached = cached;
            } else {
                
                /* 右边没有可用空间? 在 k-1 处插入 */
                k--;
                /*
                 * 将k(包含)左侧的所有元素向左移动,因此我们丢弃空闲时间较小的元素。
                 */
                sds cached = pool[0].cached;
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }
        /*
         * 尝试重用在池条目中分配的缓存 SDS 字符串,因为分配和释放此对象的成本很高
         * 注意真正要复用的sds内存空间,避免重新申请内存,而不是他的值
         */
        int klen = sdslen(key);
        // 判断字符串长度来决定是否复用sds
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            // 复制一个新的 sds 字符串并赋值
            pool[k].key = sdsdup(key);
        } else {
            /*
             * 内存拷贝函数,从数据源拷贝num个字节的数据到目标数组
             * 
             * destination:指向目标数组的指针 
             * source:指向数据源的指针 
             * num:要拷贝的字节数
             *
             */
            // 复用sds对象
            memcpy(pool[k].cached,key,klen+1);
            // 重新设置sds长度
            sdssetlen(pool[k].cached,klen);
            // 真正设置key
            pool[k].key = pool[k].cached;
        }
        // 设置空闲时间
        pool[k].idle = idle;
        // 设置key所在db
        pool[k].dbid = dbid;
    }
}

至此,经过以上步骤,我们能够得到一个bestkey(LRU策略下即最长空闲时间的key),还有一个有了部分数据的淘汰池。

下一步,我们就要删除这个key来释放内存空间了。

第三步:如何删除key

// 不断循环删除key,直至释放足够的内存
while (mem_freed < mem_tofree) {
    // 最佳淘汰key
    sds bestkey = NULL;
    // LRU策略或者LFU策略或者VOLATILE_TTL策略
    if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
        server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
    {
        // 不同的策略找bestKey
    }
    // 最后选定的要删除的key
    if (bestkey) {
        db = server.db+bestdbid;
        robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
        // 处理过期key到从节点和 AOF 文件
        // 当 master 中的 key 过期时,则将此 key 的 DEL 操作发送到所有 slaves 和 AOF 文件(如果启用)。
        propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
       
        // 获取已使用内存
        delta = (long long) zmalloc_used_memory();
        
        // 是否开启lazyfree机制
        // lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 计算删除key后的内存变化量
        delta -= (long long) zmalloc_used_memory();
        
        // 计算已释放内存
        mem_freed += delta;
        
    }
}

第四步:什么时候停止淘汰key

通过上面的源码我们能看到while循环条件是 while (mem_freed < mem_tofree),而在每一次删除key的时候,都会累加已释放key所占有的内存:

// 不断循环删除key,直至释放足够的内存
while (mem_freed < mem_tofree) {
    if (bestkey) {
        // 获取已使用内存
        delta = (long long) zmalloc_used_memory();
        
        // 是否开启lazyfree机制
        // lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 计算删除key后的内存变化量
        delta -= (long long) zmalloc_used_memory();
        
        // 计算已释放内存
        mem_freed += delta;
        
    }
}

除此之外,如果配置开启了server.lazyfree_lazy_eviction(异步淘汰),那么每淘汰一些key后还会检查一下内存状态,如果内存已经达到期望了,那么就可以手动满足循环终止条件。

// 不断循环删除key,直至释放足够的内存
while (mem_freed < mem_tofree) {
    if (bestkey) {
        // 获取已使用内存
        delta = (long long) zmalloc_used_memory();
        
        // 是否开启lazyfree机制
        // lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 计算删除key后的内存变化量
        delta -= (long long) zmalloc_used_memory();
        
        // 计算已释放内存
        mem_freed += delta;
        
        /*
         * 通常我们的停止条件是能够释放固定的、预先计算的内存量。 
         * 然而,当我们在另一个线程中删除对象时,最好不时检查我们是否已经到达我们的目标内存,因为“mem_freed”数量仅在 dbAsyncDelete() 调用中计算,而线程可以无时无刻释放内存
         */
        if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                /* Let's satisfy our stop condition. */
                // 手动满足停止条件
                mem_freed = mem_tofree;
            }
        }
        
    }
    
}

最后

限于篇幅,尽可能多的放了源码,没有连贯的文字描述,如果你觉得这样看起来比较累的话,可以去我的Github看看源码注释

版权声明:本文为lifan1998原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/lifan1998/p/15001630.html