日期:2014-05-16 浏览次数:20547 次
个人理解,Redis将各种任务以事件的方式处理。在Redis中,是以单个进程进行事件轮询:(http://blog.csdn.net/ordeder/article/details/12791359)。事件就类似于linux内核的调度schedule()。而与系统不同之处是:OS调度的基本单位是进程,而Redis的“调度单位”是事件(不断触发已发生的事件的hander)。Redis的任务的思想是一切皆“事件”。
比较典型的时间事件:serverCorn,服务器定时触发该事件进行检查,主要判定server与client的连接、还有hash是否扩容,是否进行rehash等等维护性的工作。这些操作有点类似守护进程。(http://blog.csdn.net/ordeder/article/details/12836017)
同样,在Server端和Client的通行是以文件事件进行管理的。Redis在initServer中就创建用于监听client连接请求的ipfd,并在该文件符上建立文件事件,用于处理客户端的连接请求。而且,事件的处理hander是创建cfd(accept),并在cfd上同样建立文件事件,用于读取客户端的具体命令。详见下图。
图.1. Redis中Server和User建立链接(图中的client是服务器端用于描述与客户端的链接相关的信息)
redisClient描述了服务器的状态信息,redisClient记录了与客户端建立交互的信息
-----------------------服务器数据结构-------------------------------
/* Global server state structure */
struct redisServer {
...
int ipfd; //server listen fd
...
list *clients; //client list
...
aeEventLoop *el; //事件轮询
...
};
typedef struct redisClient {
int fd; //与客户端通信的fd
...
sds querybuf; //客户端请求的串
...
} redisClient;
图2简单描绘了server和client在建立链接过程中所涉及的操作。红色部分表示建立链接的过程。黑色部分表示在事件创建过程中所涉及的操作。而蓝色部分表示创建的相关事件。
图.2. Redis Server&Client链接的建立时相关Event的建立(图中的user代表用户端,而client是服务器端用于描述与客户端的链接相关的信息)
1.服务器初始化中,建立监听事件
void initServer() {
...
//Server监听套接字的创建
if (server.port != 0)
server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
...
/*为Server的监听套接字ipfd建立文件事件,该事件的处理函数为(acceptTcpHandler)
创建用于与客户机子建立连接的clfd,以及建立对应记录客户端状态的client数据对象*/
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
...
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
//ipfd建立一个FileEvent
if (fd >= AE_SETSIZE) return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
//设置为acceptTcpHandler
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
aeSearchNearestTimer(eventLoop);
...set tvp
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
...
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
//acceptTcpHandler
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
...
}
...
processTimeEvents(eventLoop);
}
/*作为ipfd监听事件的处理函数*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[128];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
//建立cfd,cfd负责与客户端机子进行通信
cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
//建立记录client状态的数据对象
acceptCommonHandler(cfd);
}
//服务器端accept,建立clfd负责与客