0%

RedisIO多路复用实现

我看我知道,我听我明白,我做我学到,我教我掌握。

一、基础

      IO多路复用实现意为单个线程通过记录跟踪多个I/O流的状态来同时管理多个I/O流。它是所有高并发实现的基础,在各个平台和应用都有对应的实现,如Nginx、Redis等。

A simple event-driven programming library. Originally I wrote this code for the Jim’s event-loop (Jim is a Tcl interpreter) but later translated it in form of a library for easy reuse.

二、Redis源码剖析(基于6.0.7)

  1. redis6.0.7/src/ae.h,底层数据结构定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    /* File event structure */
    typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
    } aeFileEvent;

    /* Time event structure */
    typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
    * freed in recursive time event calls. */
    } aeTimeEvent;

    /* A fired event */
    typedef struct aeFiredEvent {
    int fd;
    int mask;
    } aeFiredEvent;

    /* State of an event based program */
    typedef struct aeEventLoop {
    int maxfd; /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime; /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
    } aeEventLoop;
  2. redis6.0.7/src/ae.c

    • ae_evportSolaris 10系统(Unix)下的多路复用的实现,也是最早的实现,Kernel提供了一个特殊的设备文件/dev/poll来实现。
    • ae_epoll为Linux系统下多路复用的实现。
    • kqueueOS XFreeBSD系统下多路复用的实现。
    • select则是所有平台下的实现。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
    #ifdef HAVE_KQUEUE
    #include "ae_kqueue.c"
    #else
    #include "ae_select.c"
    #endif
    #endif
    #endif
  3. redis6.0.7/src/ae_epoll.c为Linux下具体实现,其余如ae_kqueue.c等为各自平台下的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
    } aeApiState;

    static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
    zfree(state);
    return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
    zfree(state->events);
    zfree(state);
    return -1;
    }
    eventLoop->apidata = state;
    return 0;
    }

    static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    aeApiState *state = eventLoop->apidata;

    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
    return 0;
    }

    static void aeApiFree(aeEventLoop *eventLoop) {
    aeApiState *state = eventLoop->apidata;

    close(state->epfd);
    zfree(state->events);
    zfree(state);
    }

    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
    * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
    EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
    }

    static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
    epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
    /* Note, Kernel < 2.6.9 requires a non null event pointer even for
    * EPOLL_CTL_DEL. */
    epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
    }

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
    tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
    int j;

    numevents = retval;
    for (j = 0; j < numevents; j++) {
    int mask = 0;
    struct epoll_event *e = state->events+j;

    if (e->events & EPOLLIN) mask |= AE_READABLE;
    if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
    if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
    if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
    eventLoop->fired[j].fd = e->data.fd;
    eventLoop->fired[j].mask = mask;
    }
    }
    return numevents;
    }
  4. 扩展

    • nginxnginx-1.17.9/src/event/modules/ngx_epoll_module.c
    • linuxlinux/linux/fs/eventpoll.c
    • phpphp7.4/sapi/fpm/fpm/events/epoll.c

三、参考

  1. 参考一
  2. 参考二
  3. 参考三
  4. 参考四