你的位置:首页 > 数据库

[数据库]15天玩转redis —— 第五篇 集合对象类型

  这篇我们来看看Redis五大类型中的第四大类型:“集合类型”,集合类型还是蛮有意思的,第一个是因为它算是只使用key的Dictionary简易版,

这样说来的话,它就比Dictionary节省很多内存消耗,第二个是因为它和C#中的HashSet是一个等同类型,废话不多说,先看redis手册,如下:

上面就是redis中的set类型使用到的所有方法,还是老话,常用的方法也就那么四个(CURD)。。。

 

一: 常用方法

1. SAdd

  这个方法毫无疑问,就是向集合里面添加数据,比如下面这样,我往fruits集合里面添加喜爱的水果。

127.0.0.1:6379> sadd fruits apple(integer) 1127.0.0.1:6379> sadd fruits banana(integer) 1127.0.0.1:6379> smembers fruits1) "banana"2) "apple"127.0.0.1:6379> 

 

上面这个sadd你也看到了,我往集合里面成功添加了两个元素,现在你可能不满足这么简单的添加,你或许想知道set这个集合在redis底层是使用

什么来实现的,你可以用object encoding查看一下便知:

127.0.0.1:6379> object encoding fruits"hashtable"127.0.0.1:6379> 

 

看到了吧,是hashtable这个吊毛,现在闭上眼睛都能想到,肯定就是只用key的dictionary啦,对不对,如果你还有疑问的话,我还可以找到底层

代码给你看,好不啦???

 

有没有看到dictAdd方法,而其中的第三个参数正好是Null。。。对应着*val形参,你看牛叉不牛叉。。。然后我再带你看看dictAdd方法的定义。

好了,关于hashtable的实现理论,我在上一篇文章中也已经说过了,这里就不再赘叙了。

 

2. SPOP,SMEMBERS

    既然元素进来了,总不能不出来吧,这里的第一个SPOP:移除并返回集合中的一个随机元素,有一点奇怪的是,这种奇怪的方法其实在我们

C#中的HashSet并没有好办法解决,就比如”这个随机“就有点烦人了,下面这是我能想到的方法。

 

刚才随便插了一句话,下面我们继续SAdd,再SPop出来。

127.0.0.1:6379> sadd fruits pear(integer) 1127.0.0.1:6379> sadd fruits grape(integer) 1127.0.0.1:6379> sadd fruits chestnut(integer) 1127.0.0.1:6379> smembers fruits1) "grape"2) "pear"3) "banana"4) "apple"5) "chestnut"127.0.0.1:6379> spop fruits"apple"127.0.0.1:6379> spop fruits"chestnut"127.0.0.1:6379> smembers fruits1) "grape"2) "pear"3) "banana"127.0.0.1:6379> 

这个方法确实还是蛮好的,起码它是原子性操作,如果要我自己实现的话,起码还是要10行左右代码的。

 

3. SREM

    既然说到了CURD,那怎么能少了D呢,它的功能定义就是:移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略,

下面我随便举个例子,删除fruits中的pear。

127.0.0.1:6379> smembers fruits1) "grape"2) "pear"3) "banana"127.0.0.1:6379> srem fruits pear(integer) 1127.0.0.1:6379> smembers fruits1) "grape"2) "banana"127.0.0.1:6379> 

 

   好了,常用的操作就那么几个,是不是觉得好傻瓜哦。。。傻瓜就对了,方法是简单的,关键你需要了解这个方法底层是如何实现的,这样才能做到

心里有数,就比如Set函数,它的源代码全部都在 “t.set.c” 中。

 1 /* 2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3  * All rights reserved. 4  * 5  * Redistribution and use in source and binary forms, with or without 6  * modification, are permitted provided that the following conditions are met: 7  * 8  *  * Redistributions of source code must retain the above copyright notice, 9  *   this list of conditions and the following disclaimer. 10  *  * Redistributions in binary form must reproduce the above copyright 11  *   notice, this list of conditions and the following disclaimer in the 12  *   documentation and/or other materials provided with the distribution. 13  *  * Neither the name of Redis nor the names of its contributors may be used 14  *   to endorse or promote products derived from this software without 15  *   specific prior written permission. 16  * 17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27  * POSSIBILITY OF SUCH DAMAGE. 28 */ 29  30 #include "redis.h" 31  32 /*----------------------------------------------------------------------------- 33  * Set Commands 34  *----------------------------------------------------------------------------*/ 35  36 void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op); 37  38 /* Factory method to return a set that *can* hold "value". When the object has 39  * an integer-encodable value, an intset will be returned. Otherwise a regular 40  * hash table. */ 41 robj *setTypeCreate(robj *value) { 42   if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK) 43     return createIntsetObject(); 44   return createSetObject(); 45 } 46  47 int setTypeAdd(robj *subject, robj *value) { 48   long long llval; 49   if (subject->encoding == REDIS_ENCODING_HT) { 50     if (dictAdd(subject->ptr,value,NULL) == DICT_OK) { 51       incrRefCount(value); 52       return 1; 53     } 54   } else if (subject->encoding == REDIS_ENCODING_INTSET) { 55     if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { 56       uint8_t success = 0; 57       subject->ptr = intsetAdd(subject->ptr,llval,&success); 58       if (success) { 59         /* Convert to regular set when the intset contains 60          * too many entries. */ 61         if (intsetLen(subject->ptr) > server.set_max_intset_entries) 62           setTypeConvert(subject,REDIS_ENCODING_HT); 63         return 1; 64       } 65     } else { 66       /* Failed to get integer from object, convert to regular set. */ 67       setTypeConvert(subject,REDIS_ENCODING_HT); 68  69       /* The set *was* an intset and this value is not integer 70        * encodable, so dictAdd should always work. */ 71       redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK); 72       incrRefCount(value); 73       return 1; 74     } 75   } else { 76     redisPanic("Unknown set encoding"); 77   } 78   return 0; 79 } 80  81 int setTypeRemove(robj *setobj, robj *value) { 82   long long llval; 83   if (setobj->encoding == REDIS_ENCODING_HT) { 84     if (dictDelete(setobj->ptr,value) == DICT_OK) { 85       if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); 86       return 1; 87     } 88   } else if (setobj->encoding == REDIS_ENCODING_INTSET) { 89     if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { 90       int success; 91       setobj->ptr = intsetRemove(setobj->ptr,llval,&success); 92       if (success) return 1; 93     } 94   } else { 95     redisPanic("Unknown set encoding"); 96   } 97   return 0; 98 } 99 100 int setTypeIsMember(robj *subject, robj *value) {101   long long llval;102   if (subject->encoding == REDIS_ENCODING_HT) {103     return dictFind((dict*)subject->ptr,value) != NULL;104   } else if (subject->encoding == REDIS_ENCODING_INTSET) {105     if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {106       return intsetFind((intset*)subject->ptr,llval);107     }108   } else {109     redisPanic("Unknown set encoding");110   }111   return 0;112 }113 114 setTypeIterator *setTypeInitIterator(robj *subject) {115   setTypeIterator *si = zmalloc(sizeof(setTypeIterator));116   si->subject = subject;117   si->encoding = subject->encoding;118   if (si->encoding == REDIS_ENCODING_HT) {119     si->di = dictGetIterator(subject->ptr);120   } else if (si->encoding == REDIS_ENCODING_INTSET) {121     si->ii = 0;122   } else {123     redisPanic("Unknown set encoding");124   }125   return si;126 }127 128 void setTypeReleaseIterator(setTypeIterator *si) {129   if (si->encoding == REDIS_ENCODING_HT)130     dictReleaseIterator(si->di);131   zfree(si);132 }133 134 /* Move to the next entry in the set. Returns the object at the current135  * position.136  *137  * Since set elements can be internally be stored as redis objects or138  * simple arrays of integers, setTypeNext returns the encoding of the139  * set object you are iterating, and will populate the appropriate pointer140  * (eobj) or (llobj) accordingly.141  *142  * When there are no longer elements -1 is returned.143  * Returned objects ref count is not incremented, so this function is144  * copy on write friendly. */145 int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {146   if (si->encoding == REDIS_ENCODING_HT) {147     dictEntry *de = dictNext(si->di);148     if (de == NULL) return -1;149     *objele = dictGetKey(de);150   } else if (si->encoding == REDIS_ENCODING_INTSET) {151     if (!intsetGet(si->subject->ptr,si->ii++,llele))152       return -1;153   }154   return si->encoding;155 }156 157 /* The not copy on write friendly version but easy to use version158  * of setTypeNext() is setTypeNextObject(), returning new objects159  * or incrementing the ref count of returned objects. So if you don't160  * retain a pointer to this object you should call decrRefCount() against it.161  *162  * This function is the way to go for write operations where COW is not163  * an issue as the result will be anyway of incrementing the ref count. */164 robj *setTypeNextObject(setTypeIterator *si) {165   int64_t intele;166   robj *objele;167   int encoding;168 169   encoding = setTypeNext(si,&objele,&intele);170   switch(encoding) {171     case -1:  return NULL;172     case REDIS_ENCODING_INTSET:173       return createStringObjectFromLongLong(intele);174     case REDIS_ENCODING_HT:175       incrRefCount(objele);176       return objele;177     default:178       redisPanic("Unsupported encoding");179   }180   return NULL; /* just to suppress warnings */181 }182 183 /* Return random element from a non empty set.184  * The returned element can be a int64_t value if the set is encoded185  * as an "intset" blob of integers, or a redis object if the set186  * is a regular set.187  *188  * The caller provides both pointers to be populated with the right189  * object. The return value of the function is the object->encoding190  * field of the object and is used by the caller to check if the191  * int64_t pointer or the redis object pointer was populated.192  *193  * When an object is returned (the set was a real set) the ref count194  * of the object is not incremented so this function can be considered195  * copy on write friendly. */196 int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {197   if (setobj->encoding == REDIS_ENCODING_HT) {198     dictEntry *de = dictGetRandomKey(setobj->ptr);199     *objele = dictGetKey(de);200   } else if (setobj->encoding == REDIS_ENCODING_INTSET) {201     *llele = intsetRandom(setobj->ptr);202   } else {203     redisPanic("Unknown set encoding");204   }205   return setobj->encoding;206 }207 208 unsigned long setTypeSize(robj *subject) {209   if (subject->encoding == REDIS_ENCODING_HT) {210     return dictSize((dict*)subject->ptr);211   } else if (subject->encoding == REDIS_ENCODING_INTSET) {212     return intsetLen((intset*)subject->ptr);213   } else {214     redisPanic("Unknown set encoding");215   }216 }217 218 /* Convert the set to specified encoding. The resulting dict (when converting219  * to a hash table) is presized to hold the number of elements in the original220  * set. */221 void setTypeConvert(robj *setobj, int enc) {222   setTypeIterator *si;223   redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET &&224               setobj->encoding == REDIS_ENCODING_INTSET);225 226   if (enc == REDIS_ENCODING_HT) {227     int64_t intele;228     dict *d = dictCreate(&setDictType,NULL);229     robj *element;230 231     /* Presize the dict to avoid rehashing */232     dictExpand(d,intsetLen(setobj->ptr));233 234     /* To add the elements we extract integers and create redis objects */235     si = setTypeInitIterator(setobj);236     while (setTypeNext(si,NULL,&intele) != -1) {237       element = createStringObjectFromLongLong(intele);238       redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK);239     }240     setTypeReleaseIterator(si);241 242     setobj->encoding = REDIS_ENCODING_HT;243     zfree(setobj->ptr);244     setobj->ptr = d;245   } else {246     redisPanic("Unsupported set conversion");247   }248 }249 250 void saddCommand(redisClient *c) {251   robj *set;252   int j, added = 0;253 254   set = lookupKeyWrite(c->db,c->argv[1]);255   if (set == NULL) {256     set = setTypeCreate(c->argv[2]);257     dbAdd(c->db,c->argv[1],set);258   } else {259     if (set->type != REDIS_SET) {260       addReply(c,shared.wrongtypeerr);261       return;262     }263   }264 265   for (j = 2; j < c->argc; j++) {266     c->argv[j] = tryObjectEncoding(c->argv[j]);267     if (setTypeAdd(set,c->argv[j])) added++;268   }269   if (added) {270     signalModifiedKey(c->db,c->argv[1]);271     notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[1],c->db->id);272   }273   server.dirty += added;274   addReplyLongLong(c,added);275 }276 277 void sremCommand(redisClient *c) {278   robj *set;279   int j, deleted = 0, keyremoved = 0;280 281   if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||282     checkType(c,set,REDIS_SET)) return;283 284   for (j = 2; j < c->argc; j++) {285     if (setTypeRemove(set,c->argv[j])) {286       deleted++;287       if (setTypeSize(set) == 0) {288         dbDelete(c->db,c->argv[1]);289         keyremoved = 1;290         break;291       }292     }293   }294   if (deleted) {295     signalModifiedKey(c->db,c->argv[1]);296     notifyKeyspaceEvent(REDIS_NOTIFY_SET,"srem",c->argv[1],c->db->id);297     if (keyremoved)298       notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],299                 c->db->id);300     server.dirty += deleted;301   }302   addReplyLongLong(c,deleted);303 }304 305 void smoveCommand(redisClient *c) {306   robj *srcset, *dstset, *ele;307   srcset = lookupKeyWrite(c->db,c->argv[1]);308   dstset = lookupKeyWrite(c->db,c->argv[2]);309   ele = c->argv[3] = tryObjectEncoding(c->argv[3]);310 311   /* If the source key does not exist return 0 */312   if (srcset == NULL) {313     addReply(c,shared.czero);314     return;315   }316 317   /* If the source key has the wrong type, or the destination key318    * is set and has the wrong type, return with an error. */319   if (checkType(c,srcset,REDIS_SET) ||320     (dstset && checkType(c,dstset,REDIS_SET))) return;321 322   /* If srcset and dstset are equal, SMOVE is a no-op */323   if (srcset == dstset) {324     addReply(c,setTypeIsMember(srcset,ele) ? shared.cone : shared.czero);325     return;326   }327 328   /* If the element cannot be removed from the src set, return 0. */329   if (!setTypeRemove(srcset,ele)) {330     addReply(c,shared.czero);331     return;332   }333   notifyKeyspaceEvent(REDIS_NOTIFY_SET,"srem",c->argv[1],c->db->id);334 335   /* Remove the src set from the database when empty */336   if (setTypeSize(srcset) == 0) {337     dbDelete(c->db,c->argv[1]);338     notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);339   }340   signalModifiedKey(c->db,c->argv[1]);341   signalModifiedKey(c->db,c->argv[2]);342   server.dirty++;343 344   /* Create the destination set when it doesn't exist */345   if (!dstset) {346     dstset = setTypeCreate(ele);347     dbAdd(c->db,c->argv[2],dstset);348   }349 350   /* An extra key has changed when ele was successfully added to dstset */351   if (setTypeAdd(dstset,ele)) {352     server.dirty++;353     notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[2],c->db->id);354   }355   addReply(c,shared.cone);356 }357 358 void sismemberCommand(redisClient *c) {359   robj *set;360 361   if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||362     checkType(c,set,REDIS_SET)) return;363 364   c->argv[2] = tryObjectEncoding(c->argv[2]);365   if (setTypeIsMember(set,c->argv[2]))366     addReply(c,shared.cone);367   else368     addReply(c,shared.czero);369 }370 371 void scardCommand(redisClient *c) {372   robj *o;373 374   if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||375     checkType(c,o,REDIS_SET)) return;376 377   addReplyLongLong(c,setTypeSize(o));378 }379 380 void spopCommand(redisClient *c) {381   robj *set, *ele, *aux;382   int64_t llele;383   int encoding;384 385   if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||386     checkType(c,set,REDIS_SET)) return;387 388   encoding = setTypeRandomElement(set,&ele,&llele);389   if (encoding == REDIS_ENCODING_INTSET) {390     ele = createStringObjectFromLongLong(llele);391     set->ptr = intsetRemove(set->ptr,llele,NULL);392   } else {393     incrRefCount(ele);394     setTypeRemove(set,ele);395   }396   notifyKeyspaceEvent(REDIS_NOTIFY_SET,"spop",c->argv[1],c->db->id);397 398   /* Replicate/AOF this command as an SREM operation */399   aux = createStringObject("SREM",4);400   rewriteClientCommandVector(c,3,aux,c->argv[1],ele);401   decrRefCount(ele);402   decrRefCount(aux);403 404   addReplyBulk(c,ele);405   if (setTypeSize(set) == 0) {406     dbDelete(c->db,c->argv[1]);407     notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);408   }409   signalModifiedKey(c->db,c->argv[1]);410   server.dirty++;411 }412 413 /* handle the "SRANDMEMBER key <count>" variant. The normal version of the414  * command is handled by the srandmemberCommand() function itself. */415 416 /* How many times bigger should be the set compared to the requested size417  * for us to don't use the "remove elements" strategy? Read later in the418  * implementation for more info. */419 #define SRANDMEMBER_SUB_STRATEGY_MUL 3420 421 void srandmemberWithCountCommand(redisClient *c) {422   long l;423   unsigned long count, size;424   int uniq = 1;425   robj *set, *ele;426   int64_t llele;427   int encoding;428 429   dict *d;430 431   if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return;432   if (l >= 0) {433     count = (unsigned) l;434   } else {435     /* A negative count means: return the same elements multiple times436      * (i.e. don't remove the extracted element after every extraction). */437     count = -l;438     uniq = 0;439   }440 441   if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))442     == NULL || checkType(c,set,REDIS_SET)) return;443   size = setTypeSize(set);444 445   /* If count is zero, serve it ASAP to avoid special cases later. */446   if (count == 0) {447     addReply(c,shared.emptymultibulk);448     return;449   }450 451   /* CASE 1: The count was negative, so the extraction method is just:452    * "return N random elements" sampling the whole set every time.453    * This case is trivial and can be served without auxiliary data454    * structures. */455   if (!uniq) {456     addReplyMultiBulkLen(c,count);457     while(count--) {458       encoding = setTypeRandomElement(set,&ele,&llele);459       if (encoding == REDIS_ENCODING_INTSET) {460         addReplyBulkLongLong(c,llele);461       } else {462         addReplyBulk(c,ele);463       }464     }465     return;466   }467 468   /* CASE 2:469    * The number of requested elements is greater than the number of470    * elements inside the set: simply return the whole set. */471   if (count >= size) {472     sunionDiffGenericCommand(c,c->argv+1,1,NULL,REDIS_OP_UNION);473     return;474   }475 476   /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */477   d = dictCreate(&setDictType,NULL);478 479   /* CASE 3:480    * The number of elements inside the set is not greater than481    * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.482    * In this case we create a set from scratch with all the elements, and483    * subtract random elements to reach the requested number of elements.484    *485    * This is done because if the number of requsted elements is just486    * a bit less than the number of elements in the set, the natural approach487    * used into CASE 3 is highly inefficient. */488   if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {489     setTypeIterator *si;490 491     /* Add all the elements into the temporary dictionary. */492     si = setTypeInitIterator(set);493     while((encoding = setTypeNext(si,&ele,&llele)) != -1) {494       int retval = DICT_ERR;495 496       if (encoding == REDIS_ENCODING_INTSET) {497         retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);498       } else {499         retval = dictAdd(d,dupStringObject(ele),NULL);500       }501       redisAssert(retval == DICT_OK);502     }503     setTypeReleaseIterator(si);504     redisAssert(dictSize(d) == size);505 506     /* Remove random elements to reach the right count. */507     while(size > count) {508       dictEntry *de;509 510       de = dictGetRandomKey(d);511       dictDelete(d,dictGetKey(de));512       size--;513     }514   }515 516   /* CASE 4: We have a big set compared to the requested number of elements.517    * In this case we can simply get random elements from the set and add518    * to the temporary set, trying to eventually get enough unique elements519    * to reach the specified count. */520   else {521     unsigned long added = 0;522 523     while(added < count) {524       encoding = setTypeRandomElement(set,&ele,&llele);525       if (encoding == REDIS_ENCODING_INTSET) {526         ele = createStringObjectFromLongLong(llele);527       } else {528         ele = dupStringObject(ele);529       }530       /* Try to add the object to the dictionary. If it already exists531        * free it, otherwise increment the number of objects we have532        * in the result dictionary. */533       if (dictAdd(d,ele,NULL) == DICT_OK)534         added++;535       else536         decrRefCount(ele);537     }538   }539 540   /* CASE 3 & 4: send the result to the user. */541   {542     dictIterator *di;543     dictEntry *de;544 545     addReplyMultiBulkLen(c,count);546     di = dictGetIterator(d);547     while((de = dictNext(di)) != NULL)548       addReplyBulk(c,dictGetKey(de));549     dictReleaseIterator(di);550     dictRelease(d);551   }552 }553 554 void srandmemberCommand(redisClient *c) {555   robj *set, *ele;556   int64_t llele;557   int encoding;558 559   if (c->argc == 3) {560     srandmemberWithCountCommand(c);561     return;562   } else if (c->argc > 3) {563     addReply(c,shared.syntaxerr);564     return;565   }566 567   if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||568     checkType(c,set,REDIS_SET)) return;569 570   encoding = setTypeRandomElement(set,&ele,&llele);571   if (encoding == REDIS_ENCODING_INTSET) {572     addReplyBulkLongLong(c,llele);573   } else {574     addReplyBulk(c,ele);575   }576 }577 578 int qsortCompareSetsByCardinality(const void *s1, const void *s2) {579   return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2);580 }581 582 /* This is used by SDIFF and in this case we can receive NULL that should583  * be handled as empty sets. */584 int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {585   robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;586 587   return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0);588 }589 590 void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {591   robj **sets = zmalloc(sizeof(robj*)*setnum);592   setTypeIterator *si;593   robj *eleobj, *dstset = NULL;594   int64_t intobj;595   void *replylen = NULL;596   unsigned long j, cardinality = 0;597   int encoding;598 599   for (j = 0; j < setnum; j++) {600     robj *setobj = dstkey ?601       lookupKeyWrite(c->db,setkeys[j]) :602       lookupKeyRead(c->db,setkeys[j]);603     if (!setobj) {604       zfree(sets);605       if (dstkey) {606         if (dbDelete(c->db,dstkey)) {607           signalModifiedKey(c->db,dstkey);608           server.dirty++;609         }610         addReply(c,shared.czero);611       } else {612         addReply(c,shared.emptymultibulk);613       }614       return;615     }616     if (checkType(c,setobj,REDIS_SET)) {617       zfree(sets);618       return;619     }620     sets[j] = setobj;621   }622   /* Sort sets from the smallest to largest, this will improve our623    * algorithm's performance */624   qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);625 626   /* The first thing we should output is the total number of elements...627    * since this is a multi-bulk write, but at this stage we don't know628    * the intersection set size, so we use a trick, append an empty object629    * to the output list and save the pointer to later modify it with the630    * right length */631   if (!dstkey) {632     replylen = addDeferredMultiBulkLength(c);633   } else {634     /* If we have a target key where to store the resulting set635      * create this key with an empty set inside */636     dstset = createIntsetObject();637   }638 639   /* Iterate all the elements of the first (smallest) set, and test640    * the element against all the other sets, if at least one set does641    * not include the element it is discarded */642   si = setTypeInitIterator(sets[0]);643   while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {644     for (j = 1; j < setnum; j++) {645       if (sets[j] == sets[0]) continue;646       if (encoding == REDIS_ENCODING_INTSET) {647         /* intset with intset is simple... and fast */648         if (sets[j]->encoding == REDIS_ENCODING_INTSET &&649           !intsetFind((intset*)sets[j]->ptr,intobj))650         {651           break;652         /* in order to compare an integer with an object we653          * have to use the generic function, creating an object654          * for this */655         } else if (sets[j]->encoding == REDIS_ENCODING_HT) {656           eleobj = createStringObjectFromLongLong(intobj);657           if (!setTypeIsMember(sets[j],eleobj)) {658             decrRefCount(eleobj);659             break;660           }661           decrRefCount(eleobj);662         }663       } else if (encoding == REDIS_ENCODING_HT) {664         /* Optimization... if the source object is integer665          * encoded AND the target set is an intset, we can get666          * a much faster path. */667         if (eleobj->encoding == REDIS_ENCODING_INT &&668           sets[j]->encoding == REDIS_ENCODING_INTSET &&669           !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))670         {671           break;672         /* else... object to object check is easy as we use the673          * type agnostic API here. */674         } else if (!setTypeIsMember(sets[j],eleobj)) {675           break;676         }677       }678     }679 680     /* Only take action when all sets contain the member */681     if (j == setnum) {682       if (!dstkey) {683         if (encoding == REDIS_ENCODING_HT)684           addReplyBulk(c,eleobj);685         else686           addReplyBulkLongLong(c,intobj);687         cardinality++;688       } else {689         if (encoding == REDIS_ENCODING_INTSET) {690           eleobj = createStringObjectFromLongLong(intobj);691           setTypeAdd(dstset,eleobj);692           decrRefCount(eleobj);693         } else {694           setTypeAdd(dstset,eleobj);695         }696       }697     }698   }699   setTypeReleaseIterator(si);700 701   if (dstkey) {702     /* Store the resulting set into the target, if the intersection703      * is not an empty set. */704     int deleted = dbDelete(c->db,dstkey);705     if (setTypeSize(dstset) > 0) {706       dbAdd(c->db,dstkey,dstset);707       addReplyLongLong(c,setTypeSize(dstset));708       notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sinterstore",709         dstkey,c->db->id);710     } else {711       decrRefCount(dstset);712       addReply(c,shared.czero);713       if (deleted)714         notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",715           dstkey,c->db->id);716     }717     signalModifiedKey(c->db,dstkey);718     server.dirty++;719   } else {720     setDeferredMultiBulkLength(c,replylen,cardinality);721   }722   zfree(sets);723 }724 725 void sinterCommand(redisClient *c) {726   sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);727 }728 729 void sinterstoreCommand(redisClient *c) {730   sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);731 }732 733 #define REDIS_OP_UNION 0734 #define REDIS_OP_DIFF 1735 #define REDIS_OP_INTER 2736 737 void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {738   robj **sets = zmalloc(sizeof(robj*)*setnum);739   setTypeIterator *si;740   robj *ele, *dstset = NULL;741   int j, cardinality = 0;742   int diff_algo = 1;743 744   for (j = 0; j < setnum; j++) {745     robj *setobj = dstkey ?746       lookupKeyWrite(c->db,setkeys[j]) :747       lookupKeyRead(c->db,setkeys[j]);748     if (!setobj) {749       sets[j] = NULL;750       continue;751     }752     if (checkType(c,setobj,REDIS_SET)) {753       zfree(sets);754       return;755     }756     sets[j] = setobj;757   }758 759   /* Select what DIFF algorithm to use.760    *761    * Algorithm 1 is O(N*M) where N is the size of the element first set762    * and M the total number of sets.763    *764    * Algorithm 2 is O(N) where N is the total number of elements in all765    * the sets.766    *767    * We compute what is the best bet with the current input here. */768   if (op == REDIS_OP_DIFF && sets[0]) {769     long long algo_one_work = 0, algo_two_work = 0;770 771     for (j = 0; j < setnum; j++) {772       if (sets[j] == NULL) continue;773 774       algo_one_work += setTypeSize(sets[0]);775       algo_two_work += setTypeSize(sets[j]);776     }777 778     /* Algorithm 1 has better constant times and performs less operations779      * if there are elements in common. Give it some advantage. */780     algo_one_work /= 2;781     diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;782 783     if (diff_algo == 1 && setnum > 1) {784       /* With algorithm 1 it is better to order the sets to subtract785        * by decreasing size, so that we are more likely to find786        * duplicated elements ASAP. */787       qsort(sets+1,setnum-1,sizeof(robj*),788         qsortCompareSetsByRevCardinality);789     }790   }791 792   /* We need a temp set object to store our union. If the dstkey793    * is not NULL (that is, we are inside an SUNIONSTORE operation) then794    * this set object will be the resulting object to set into the target key*/795   dstset = createIntsetObject();796 797   if (op == REDIS_OP_UNION) {798     /* Union is trivial, just add every element of every set to the799      * temporary set. */800     for (j = 0; j < setnum; j++) {801       if (!sets[j]) continue; /* non existing keys are like empty sets */802 803       si = setTypeInitIterator(sets[j]);804       while((ele = setTypeNextObject(si)) != NULL) {805         if (setTypeAdd(dstset,ele)) cardinality++;806         decrRefCount(ele);807       }808       setTypeReleaseIterator(si);809     }810   } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 1) {811     /* DIFF Algorithm 1:812      *813      * We perform the diff by iterating all the elements of the first set,814      * and only adding it to the target set if the element does not exist815      * into all the other sets.816      *817      * This way we perform at max N*M operations, where N is the size of818      * the first set, and M the number of sets. */819     si = setTypeInitIterator(sets[0]);820     while((ele = setTypeNextObject(si)) != NULL) {821       for (j = 1; j < setnum; j++) {822         if (!sets[j]) continue; /* no key is an empty set. */823         if (sets[j] == sets[0]) break; /* same set! */824         if (setTypeIsMember(sets[j],ele)) break;825       }826       if (j == setnum) {827         /* There is no other set with this element. Add it. */828         setTypeAdd(dstset,ele);829         cardinality++;830       }831       decrRefCount(ele);832     }833     setTypeReleaseIterator(si);834   } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 2) {835     /* DIFF Algorithm 2:836      *837      * Add all the elements of the first set to the auxiliary set.838      * Then remove all the elements of all the next sets from it.839      *840      * This is O(N) where N is the sum of all the elements in every841      * set. */842     for (j = 0; j < setnum; j++) {843       if (!sets[j]) continue; /* non existing keys are like empty sets */844 845       si = setTypeInitIterator(sets[j]);846       while((ele = setTypeNextObject(si)) != NULL) {847         if (j == 0) {848           if (setTypeAdd(dstset,ele)) cardinality++;849         } else {850           if (setTypeRemove(dstset,ele)) cardinality--;851         }852         decrRefCount(ele);853       }854       setTypeReleaseIterator(si);855 856       /* Exit if result set is empty as any additional removal857        * of elements will have no effect. */858       if (cardinality == 0) break;859     }860   }861 862   /* Output the content of the resulting set, if not in STORE mode */863   if (!dstkey) {864     addReplyMultiBulkLen(c,cardinality);865     si = setTypeInitIterator(dstset);866     while((ele = setTypeNextObject(si)) != NULL) {867       addReplyBulk(c,ele);868       decrRefCount(ele);869     }870     setTypeReleaseIterator(si);871     decrRefCount(dstset);872   } else {873     /* If we have a target key where to store the resulting set874      * create this key with the result set inside */875     int deleted = dbDelete(c->db,dstkey);876     if (setTypeSize(dstset) > 0) {877       dbAdd(c->db,dstkey,dstset);878       addReplyLongLong(c,setTypeSize(dstset));879       notifyKeyspaceEvent(REDIS_NOTIFY_SET,880         op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore",881         dstkey,c->db->id);882     } else {883       decrRefCount(dstset);884       addReply(c,shared.czero);885       if (deleted)886         notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",887           dstkey,c->db->id);888     }889     signalModifiedKey(c->db,dstkey);890     server.dirty++;891   }892   zfree(sets);893 }894 895 void sunionCommand(redisClient *c) {896   sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);897 }898 899 void sunionstoreCommand(redisClient *c) {900   sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);901 }902 903 void sdiffCommand(redisClient *c) {904   sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);905 }906 907 void sdiffstoreCommand(redisClient *c) {908   sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);909 }910 911 void sscanCommand(redisClient *c) {912   robj *set;913   unsigned long cursor;914 915   if (parseScanCursorOrReply(c,c->argv[2],&cursor) == REDIS_ERR) return;916   if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||917     checkType(c,set,REDIS_SET)) return;918   scanGenericCommand(c,set,cursor);919 }

View Code