Redis系列(八):数据结构List双向链表中阻塞版本之BLPOP、BRPOP和LINDEX、LINSERT、LRANGE命令详解
1.BRPOP、BLPOP
BLPOP:
BLPOP 是阻塞式列表的弹出原语。 它是命令 LPOP 的阻塞版本,这是因为当给定列表内没有任何元素可供弹出的时候,
连接将被 BLPOP 命令阻塞。 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。
BRPOP:
BRPOP
是一个阻塞的列表弹出原语。 它是 RPOP 的阻塞版本,因为这个命令会在给定list无法弹出任何元素的时候阻塞连接。 该命令会按照给出的 key 顺序查看 list,并在找到的第一个非空 list 的尾部弹出一个元素。
请在 BLPOP 文档 中查看该命令的准确语义,因为 BRPOP
和 BLPOP 基本是完全一样的,除了它们一个是从尾部弹出元素,而另一个是从头部弹出元素。
时间复杂度 :O(1)
- 127.0.0.1:6379> rpush order 10001
- (integer) 1
- 127.0.0.1:6379> rpush order 10002
- (integer) 2
- 127.0.0.1:6379> rpush order 10003
- (integer) 3
- 127.0.0.1:6379> rpush order 10004
- (integer) 4
- 127.0.0.1:6379> rpush order 10005
- (integer) 5
- 127.0.0.1:6379> rpush order 10006
- (integer) 1
- 127.0.0.1:6379>
- 127.0.0.1:6379> brpop order 0
- 1) "order"
- 2) "10005"
- 127.0.0.1:6379> brpop order 0
- 1) "order"
- 2) "10004"
- 127.0.0.1:6379> brpop order 0
- 1) "order"
- 2) "10003"
- 127.0.0.1:6379> brpop order 0
- 1) "order"
- 2) "10002"
- 127.0.0.1:6379> brpop order 0
- 1) "order"
- 2) "10001"
- 127.0.0.1:6379> brpop order 0
- 1) "order"
- 2) "10006"
- (2765.54s)
- 127.0.0.1:6379>
源码解析
- void blpopCommand(client *c) {
- blockingPopGenericCommand(c,LIST_HEAD);
- }
- void brpopCommand(client *c) {
- blockingPopGenericCommand(c,LIST_TAIL);
- }
- blockingPopGenericCommand
- /* Blocking RPOP/LPOP */
- void blockingPopGenericCommand(client *c, int where) {
- robj *o;
- mstime_t timeout;
- int j;
- if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
- != C_OK) return;
- for (j = 1; j < c->argc-1; j++) {
- o = lookupKeyWrite(c->db,c->argv[j]);
- if (o != NULL) {
- if (o->type != OBJ_LIST) {
- addReply(c,shared.wrongtypeerr);
- return;
- } else {
- if (listTypeLength(o) != 0) {
- /* Non empty list, this is like a non normal [LR]POP. */
- char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
- robj *value = listTypePop(o,where);
- serverAssert(value != NULL);
- addReplyArrayLen(c,2);
- addReplyBulk(c,c->argv[j]);
- addReplyBulk(c,value);
- decrRefCount(value);
- notifyKeyspaceEvent(NOTIFY_LIST,event,
- c->argv[j],c->db->id);
- if (listTypeLength(o) == 0) {
- dbDelete(c->db,c->argv[j]);
- notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
- c->argv[j],c->db->id);
- }
- signalModifiedKey(c,c->db,c->argv[j]);
- server.dirty++;
- /* Replicate it as an [LR]POP instead of B[LR]POP. */
- rewriteClientCommandVector(c,2,
- (where == LIST_HEAD) ? shared.lpop : shared.rpop,
- c->argv[j]);
- return;
- }
- }
- }
- }
- /* If we are inside a MULTI/EXEC and the list is empty the only thing
- * we can do is treating it as a timeout (even with timeout 0). */
- if (c->flags & CLIENT_MULTI) {
- addReplyNullArray(c);
- return;
- }
- /* If the list is empty or the key does not exists we must block */
- blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
- }
- blockForKeys
- void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
- dictEntry *de;
- list *l;
- int j;
- c->bpop.timeout = timeout;
- c->bpop.target = target;
- if (target != NULL) incrRefCount(target);
- for (j = 0; j < numkeys; j++) {
- /* Allocate our bkinfo structure, associated to each key the client
- * is blocked for. */
- bkinfo *bki = zmalloc(sizeof(*bki));
- if (btype == BLOCKED_STREAM)
- bki->stream_id = ids[j];
- /* If the key already exists in the dictionary ignore it. */
- if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
- zfree(bki);
- continue;
- }
- incrRefCount(keys[j]);
- /* And in the other "side", to map keys -> clients */
- de = dictFind(c->db->blocking_keys,keys[j]);
- if (de == NULL) {
- int retval;
- /* For every key we take a list of clients blocked for it */
- l = listCreate();
- retval = dictAdd(c->db->blocking_keys,keys[j],l);
- incrRefCount(keys[j]);
- serverAssertWithInfo(c,keys[j],retval == DICT_OK);
- } else {
- l = dictGetVal(de);
- }
- listAddNodeTail(l,c);
- bki->listnode = listLast(l);
- }
- blockClient(c,btype);
- }
- blockClient
- void blockClient(client *c, int btype) {
- c->flags |= CLIENT_BLOCKED;
- c->btype = btype;
- server.blocked_clients++;
- server.blocked_clients_by_type[btype]++;
- addClientToTimeoutTable(c);
- }
- addClientToTimeoutTable
- void addClientToTimeoutTable(client *c) {
- if (c->bpop.timeout == 0) return;
- uint64_t timeout = c->bpop.timeout;
- unsigned char buf[CLIENT_ST_KEYLEN];
- encodeTimeoutKey(buf,timeout,c);
- if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
- c->flags |= CLIENT_IN_TO_TABLE;
- }
2.LINDEX
返回列表里的元素的索引 index 存储在 key 里面。 下标是从0开始索引的,所以 0 是表示第一个元素,
1 表示第二个元素,并以此类推。 负数索引用于指定从列表尾部开始索引的元素。在这种方法下,-1 表示最后一个元素,-2 表示倒数第二个元素,并以此往前推。
当 key 位置的值不是一个列表的时候,会返回一个error。
时间复杂度:O(N)
- 127.0.0.1:6379> lpush mylist "World" "Hello"
- (integer) 2
- 127.0.0.1:6379> lindex mylist 0
- "Hello"
- 127.0.0.1:6379> lindex mylist 1
- "World"
- 127.0.0.1:6379>
源码解析
- void lindexCommand(client *c) {
- robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
- if (o == NULL || checkType(c,o,OBJ_LIST)) return;
- long index;
- robj *value = NULL;
- if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
- return;
- if (o->encoding == OBJ_ENCODING_QUICKLIST) {
- quicklistEntry entry;
- if (quicklistIndex(o->ptr, index, &entry)) {
- if (entry.value) {
- value = createStringObject((char*)entry.value,entry.sz);
- } else {
- value = createStringObjectFromLongLong(entry.longval);
- }
- addReplyBulk(c,value);
- decrRefCount(value);
- } else {
- addReplyNull(c);
- }
- } else {
- serverPanic("Unknown list encoding");
- }
- }
- /* Populate 'entry' with the element at the specified zero-based index
- * where 0 is the head, 1 is the element next to head
- * and so on. Negative integers are used in order to count
- * from the tail, -1 is the last element, -2 the penultimate
- * and so on. If the index is out of range 0 is returned.
- *
- * Returns 1 if element found
- * Returns 0 if element not found */
- int quicklistIndex(const quicklist *quicklist, const long long idx,
- quicklistEntry *entry) {
- quicklistNode *n;
- unsigned long long accum = 0;
- unsigned long long index;
- int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
- initEntry(entry);
- entry->quicklist = quicklist;
- if (!forward) {
- index = (-idx) - 1;
- n = quicklist->tail;
- } else {
- index = idx;
- n = quicklist->head;
- }
- if (index >= quicklist->count)
- return 0;
- while (likely(n)) {
- if ((accum + n->count) > index) {
- break;
- } else {
- D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
- accum);
- accum += n->count;
- n = forward ? n->next : n->prev;
- }
- }
- if (!n)
- return 0;
- D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
- accum, index, index - accum, (-index) - 1 + accum);
- entry->node = n;
- if (forward) {
- /* forward = normal head-to-tail offset. */
- entry->offset = index - accum;
- } else {
- /* reverse = need negative offset for tail-to-head, so undo
- * the result of the original if (index < 0) above. */
- entry->offset = (-index) - 1 + accum;
- }
- quicklistDecompressNodeForUse(entry->node);
- entry->zi = ziplistIndex(entry->node->zl, entry->offset);
- ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
- /* The caller will use our result, so we don't re-compress here.
- * The caller can recompress or delete the node as needed. */
- return 1;
- }
3.LINSERT
把 value 插入存于 key 的列表中在基准值 pivot 的前面或后面。
当 key 不存在时,这个list会被看作是空list,任何操作都不会发生。
当 key 存在,但保存的不是一个list的时候,会返回error。
时间复杂度:O(N)
- 127.0.0.1:6379> linsert mylist before "World" "There"
- (integer) 3
- 127.0.0.1:6379> lrange mylist 0 -1
- 1) "Hello"
- 2) "There"
- 3) "World"
- 127.0.0.1:6379>
4.LRANGE
返回存储在 key 的列表里指定范围内的元素。 start 和 end 偏移量都是基于0的下标,即list的第一个元素下标是0(list的表头),第二个元素下标是1,以此类推。
偏移量也可以是负数,表示偏移量是从list尾部开始计数。 例如, -1 表示列表的最后一个元素,-2 是倒数第二个,以此类推。
时间复杂度:O(S+N)
- 127.0.0.1:6379> lrange mylist 0 -1
- 1) "Hello"
- 2) "There"
- 3) "World"
- 127.0.0.1:6379> lrange mylist 0 1
- 1) "Hello"
- 2) "There"
- 127.0.0.1:6379>
源码解析
- void lrangeCommand(client *c) {
- robj *o;
- long start, end, llen, rangelen;
- if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
- (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
- || checkType(c,o,OBJ_LIST)) return;
- llen = listTypeLength(o);
- /* convert negative indexes */
- if (start < 0) start = llen+start;
- if (end < 0) end = llen+end;
- if (start < 0) start = 0;
- /* Invariant: start >= 0, so this test will be true when end < 0.
- * The range is empty when start > end or start >= length. */
- if (start > end || start >= llen) {
- addReply(c,shared.emptyarray);
- return;
- }
- if (end >= llen) end = llen-1;
- rangelen = (end-start)+1;
- /* Return the result in form of a multi-bulk reply */
- addReplyArrayLen(c,rangelen);
- if (o->encoding == OBJ_ENCODING_QUICKLIST) {
- listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
- while(rangelen--) {
- listTypeEntry entry;
- listTypeNext(iter, &entry);
- quicklistEntry *qe = &entry.entry;
- if (qe->value) {
- addReplyBulkCBuffer(c,qe->value,qe->sz);
- } else {
- addReplyBulkLongLong(c,qe->longval);
- }
- }
- listTypeReleaseIterator(iter);
- } else {
- serverPanic("List encoding is not QUICKLIST!");
- }
- }