你的位置:首页 > 数据库

[数据库]Redis学习——ae事件处理源码分析


0. 前言

  Redis在封装事件的处理采用了Reactor模式,添加了定时事件的处理。Redis处理事件是单进程单线程的,而经典Reator模式对事件是串行处理的。即如果有一个事件阻塞过久的话会导致整个Redis被阻塞。

   对于Reactor模式,可以查看维基百科。(这里推荐一本书:《面向模式的软件架构:卷2》,里面详细讲了Reactor模式,但是刚开始可能会不太 明白,耐下心来就好)。我自己也使用Reactor封装了一个事件处理,其中对于读事件加入了线程池的处理,但是还在测试(该封装主要针对公司需求来 的)。加入线程池的处理是参考了Nettty的事件处理,其也是Reactor+线程池。

  补充一下,对于Redis的源码分析:《Redis设计与实现》讲得挺好的。

  涉及的文件:ae.h/ae.c  ae_epoll.c

  涉及的知识点:epoll,Reactor

  完成ae事件代码注释在GitHud 上

1. 数据结构

   对于文件和时间的事件结构体,其中的包含了一些函数指针,当对应的事件产生的时候会调用该函数(回调机制)。对于aeEventLoop结构体,比较重 要的几个字段是events,fired,timeEventHead。  对于events,起的是分离表的作用,即通过fd直接映射找到对应的 IOEvent事件结构体(这里直接索引是因为在Linux/Unix类系统中fd是连续的,可以直接作为数组下标)。

  fired保存了由底层IO多路复用返回的触发事件(这里我只分析了用epoll的实现)。

  timeEventHead保存了定时的事件链表,Redis只用到了2个定时事件。

 1 //前置声明,避免了编译出错,因为aeEventLoop需要用到aeFileEvent结构体 2 struct aeEventLoop; 3  4 /* Types and data structures */ 5 //定义文件事件处理接口(函数指针) 6 typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); 7 //时间事件处理接口(函数指针),该函数返回定时的时长 8 typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); 9 typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);10 //aeMain中使用,在调用处理事件前调用11 typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);12 13 /* File event structure */14 //文件事件结构体15 typedef struct aeFileEvent {16   //读或者写,也用于标识该事件结构体是否正在使用17   int mask; /* one of AE_(READABLE|WRITABLE) */18   //读事件的处理函数19   aeFileProc *rfileProc;20   //写事件的处理函数21   aeFileProc *wfileProc;22   //传递给上述两个函数的数据23   void *clientData;24 } aeFileEvent;25 26 /* Time event structure */27 //时间事件28 typedef struct aeTimeEvent {29   //时间事件标识符,用于唯一标识该时间事件,并且用于删除时间事件30   long long id; /* time event identifier. */31   long when_sec; /* seconds */32   long when_ms; /* milliseconds */33   //该事件对应的处理程序34   aeTimeProc *timeProc;35   //时间事件的最后一次处理程序,若已设置,则删除时间事件时会被调用36   aeEventFinalizerProc *finalizerProc;37   void *clientData;38   struct aeTimeEvent *next;39 } aeTimeEvent;40 41 /* A fired event */42 //这里用于保存已触发的事件43 typedef struct aeFiredEvent {44   int fd;45   int mask;46 } aeFiredEvent;47 48 /* State of an event based program */49 typedef struct aeEventLoop {50   //最大文件描述符的值51   int maxfd;  /* highest file descriptor currently registered */52   //文件描述符的最大监听数53   int setsize; /* max number of file descriptors tracked */54   //用于生成时间事件的唯一标识id55   long long timeEventNextId;56   //用于检测系统时间是否变更(判断标准 now<lastTime)57   time_t lastTime;   /* Used to detect system clock skew */58   //注册要使用的文件事件,这里的分离表实现为直接索引,即通过fd来访问,实现事件的分离59   aeFileEvent *events; /* Registered events */60   //已触发的事件61   aeFiredEvent *fired; /* Fired events */62   aeTimeEvent *timeEventHead;63   //停止标志,1表示停止64   int stop;65   //这个是处理底层特定API的数据,对于epoll来说,该结构体包含了epoll fd和epoll_event66   void *apidata; /* This is used for polling API specific data */67   //在调用processEvent前(即如果没有事件则睡眠),调用该处理函数68   aeBeforeSleepProc *beforesleep;69 } aeEventLoop;

 

2. API实现

 a. aeCreateEventLoop

  底层epoll多路复用初始化,然后存放在aeEventLoop中 void * 类型的apidata,隐藏了底层的实现。

 1 typedef struct aeApiState { 2   int epfd; 3   struct epoll_event *events; 4 } aeApiState; 5  6 //ae底层的数据创建以及初始化 7 static int aeApiCreate(aeEventLoop *eventLoop) { 8   aeApiState *state = zmalloc(sizeof(aeApiState)); 9 10   if (!state) return -1;11   //创建setsize个epoll_event12   state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);13   if (!state->events) {14     zfree(state);15     return -1;16   }17   state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */18   if (state->epfd == -1) {19     zfree(state->events);20     zfree(state);21     return -1;22   }23   eventLoop->apidata = state;24   return 0;25 }

 

 1 //创建事件循环,setsize为最大事件的的个数,对于epoll来说也是epoll_event的个数 2 aeEventLoop *aeCreateEventLoop(int setsize) { 3   aeEventLoop *eventLoop; 4   int i; 5  6   //分配该结构体的内存空间 7   if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; 8   eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); 9   eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);10   if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;11 12   //初始化13   eventLoop->setsize = setsize;    //最多setsize个事件14   eventLoop->lastTime = time(NULL); 15   eventLoop->timeEventHead = NULL;16   eventLoop->timeEventNextId = 0;17   eventLoop->stop = 0;18   eventLoop->maxfd = -1;       19   eventLoop->beforesleep = NULL;   20 21   //这一步为创建底层IO处理的数据,如epoll,创建epoll_event,和epfd22   if (aeApiCreate(eventLoop) == -1) goto err;23   /* Events with mask == AE_NONE are not set. So let's initialize the24    * vector with it. */25   for (i = 0; i < setsize; i++)26     eventLoop->events[i].mask = AE_NONE;27   return eventLoop;28 29 err:30   if (eventLoop) {31     zfree(eventLoop->events);32     zfree(eventLoop->fired);33     zfree(eventLoop);34   }35   return NULL;36 }

 

 

b. aeCreateFileEvent

    对于创建文件事件,需要传入一个该事件对应的处理程序,当事件发生时,会调用对应的回调函数。这里设计的aeFileEvent结构体就是将事件源(FD),事件,事件处理程序关联起来。
 

 1 //添加监听的事件,其中如果该fd对应的事件已经存在,则为修改合并旧的事件 2 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { 3   aeApiState *state = eventLoop->apidata; 4   struct epoll_event ee; 5   /* If the fd was already monitored for some event, we need a MOD 6    * operation. Otherwise we need an ADD operation. */ 7   //判断fd是否已经添加了事件的监听 8   int op = eventLoop->events[fd].mask == AE_NONE ? 9       EPOLL_CTL_ADD : EPOLL_CTL_MOD;10 11   ee.events = 0;12   mask |= eventLoop->events[fd].mask; /* Merge old events */13   if (mask & AE_READABLE) ee.events |= EPOLLIN;14   if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;15   ee.data.u64 = 0; /* avoid valgrind warning */16   ee.data.fd = fd;17   if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;18   return 0;19 }20 21 //删除指定事件的监听22 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {23   aeApiState *state = eventLoop->apidata;24   struct epoll_event ee;25   int mask = eventLoop->events[fd].mask & (~delmask);26 27   ee.events = 0;28   if (mask & AE_READABLE) ee.events |= EPOLLIN;29   if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;30   ee.data.u64 = 0; /* avoid valgrind warning */31   ee.data.fd = fd;32   if (mask != AE_NONE) {33     epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);34   } else {35     /* Note, Kernel < 2.6.9 requires a non null event pointer even for36      * EPOLL_CTL_DEL. */37     epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);38   }39 }

 

 1 //创建文件事件,并将该事件注册到eventLoop中 2 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 3     aeFileProc *proc, void *clientData) 4 { 5   if (fd >= eventLoop->setsize) { 6     errno = ERANGE; 7     return AE_ERR; 8   } 9   //直接使用fd来获取FileEvent,来后面分离事件时也采用这种方法(直接索引)10   aeFileEvent *fe = &eventLoop->events[fd];11 12   //该该事件添加eventLoop中或者修改原来的已有的(保留旧的)13   if (aeApiAddEvent(eventLoop, fd, mask) == -1)14     return AE_ERR;15 16   fe->mask |= mask;17   //将该事件的处理程序放到对应的位置18   if (mask & AE_READABLE) fe->rfileProc = proc;19   if (mask & AE_WRITABLE) fe->wfileProc = proc;20   //设置将要传递给该事件处理程序的数据21   fe->clientData = clientData;22 23   if (fd > eventLoop->maxfd)24     eventLoop->maxfd = fd;25   return AE_OK;26 }

 

 

c. aeProcessEventsz

   这个是核心部分,通过epoll_wait将事件分离出来,从而保存到fired中,对于语句 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; 通过触发事件的 fd 在events中直接映射找到与事件关联的结构体,从而实现事件分派。Reactor的核心是实现了事件的分离分派。

如下流程图:

 

 1 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { 2   aeApiState *state = eventLoop->apidata; 3   int retval, numevents = 0; 4  5   //等待事件产生 6   retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, 7       tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); 8   if (retval > 0) { 9     int j;10 11     numevents = retval;12     for (j = 0; j < numevents; j++) {13       int mask = 0;14       struct epoll_event *e = state->events+j;15 16       if (e->events & EPOLLIN) mask |= AE_READABLE;17       if (e->events & EPOLLOUT) mask |= AE_WRITABLE;18       if (e->events & EPOLLERR) mask |= AE_WRITABLE;19       if (e->events & EPOLLHUP) mask |= AE_WRITABLE;20       eventLoop->fired[j].fd = e->data.fd;21       eventLoop->fired[j].mask = mask;22     }23   }24   return numevents;25 }

 

 1 //事件处理程序 2 int aeProcessEvents(aeEventLoop *eventLoop, int flags) 3 { 4   int processed = 0, numevents; 5  6   /* Nothing to do? return ASAP */ 7   //若什么都没有设置,则直接返回 8   if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 9 10   /* Note that we want call select() even if there are no11    * file events to process as long as we want to process time12    * events, in order to sleep until the next time event is ready13    * to fire. */14   if (eventLoop->maxfd != -1 ||15     ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {16     //如果有文件事件或者设置了时间事件并且没有设置DONT_WAIT标志17     int j;18     aeTimeEvent *shortest = NULL;19     struct timeval tv, *tvp;20 21     if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))22       //查找时间最早的时间事件23       shortest = aeSearchNearestTimer(eventLoop);24     if (shortest) {25       long now_sec, now_ms;26 27       /* Calculate the time missing for the nearest28        * timer to fire. */29       //获取当前时间30       aeGetTime(&now_sec, &now_ms);31       tvp = &tv;32       tvp->tv_sec = shortest->when_sec - now_sec;33       if (shortest->when_ms < now_ms) {34         tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;35         tvp->tv_sec --;36       } else {37         tvp->tv_usec = (shortest->when_ms - now_ms)*1000;38       }39 40       //这里好像有点问题,当sec小于0时表示不用等待,应该将usec也设置为0的41       //usec比较小,影响不大42       if (tvp->tv_sec < 0) tvp->tv_sec = 0;43       if (tvp->tv_usec < 0) tvp->tv_usec = 0;44 45     } else {46       /* If we have to check for events but need to return47        * ASAP because of AE_DONT_WAIT we need to set the timeout48        * to zero */49       if (flags & AE_DONT_WAIT) {50         //不等待,直接返回51         tv.tv_sec = tv.tv_usec = 0;52         tvp = &tv;53       } else {54         //如果没有时间事件则可以阻塞55         /* Otherwise we can block */56         tvp = NULL; /* wait forever */57       }58     }59 60     numevents = aeApiPoll(eventLoop, tvp);61     for (j = 0; j < numevents; j++) {62       aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];63       int mask = eventLoop->fired[j].mask;64       int fd = eventLoop->fired[j].fd;65       int rfired = 0;66 67     /* note the fe->mask & mask & ... code: maybe an already processed68        * event removed an element that fired and we still didn't69        * processed, so we check if the event is still valid. */70       if (fe->mask & mask & AE_READABLE) {71         rfired = 1;72         fe->rfileProc(eventLoop,fd,fe->clientData,mask);73       }74       if (fe->mask & mask & AE_WRITABLE) {75         if (!rfired || fe->wfileProc != fe->rfileProc)76           //这里的判断是为了防止重复调用77           fe->wfileProc(eventLoop,fd,fe->clientData,mask);78       }79       processed++;80     }81   }82   /* Check time events */83   if (flags & AE_TIME_EVENTS)84     processed += processTimeEvents(eventLoop);85 86   return processed; /* return the number of processed file/time events */87 }

 

3. 总结

  1. Reactor模式,串行处理事件

  2. 具有定时事件功能(但是不能过多,因为是使用链表实现的)

  3. 优先处理读事件