twitter公司redis&memcached中间件twemproxy源码分析(一)

2019/10/17 posted in  源码分析

twemproxy是redis和memcached连接池中间件

github项目地址:https://github.com/twitter/twemproxy

项目简介参考:https://github.com/twitter/twemproxy#features

文档参考:https://github.com/twitter/twemproxy/blob/master/notes/recommendation.md

核心流程

主流程就是启动了一个事件循环,所有的逻辑通过事件出发调用回调函数执行

消息流转流程

消息会在三种角色的fd中进行流转:client, server, proxy

角色 作用
client 代表应用程序端接入twemproxy的fd
proxy 代表twemproxy服务器的fd,等待应用程序端接入
server 代表twemproxy链接redis/memcache的fd,proxy fd处理程序会把从client fd读到的数据转到server fd中

核心流程如下:

事件系统:epoll/kqueue/evport封装

一般linux使用epoll, mac使用kqueue,Sun Solaris使用evport

twemproxy为了统一底层api调用,对上面的三种事件处理api进行了接口的统一: event/nc_event.h,核心api参考:

函数 作用
event_base_create() 创建事件循环管理fd
event_add_in() 将一个fd的读事件纳入事件管理器的管理中
event_add_out() 将一个fd的读写事件纳入事件管理器的管理中
event_add_conn() 将一个fd的读写事件纳入事件管理器的管理中
event_wait() 从事件管理器中获取准备好读/写的fd列表,并调用cb进行处理

核心结构体event_base参考: src/event/nc_event.h

struct event_base {
    int                ep;      /* epoll descriptor */
    struct epoll_event *event;  /* event[] - events that were triggered
    int                nevent;  /* # event */
    event_cb_t         cb;      /* event callback */
};
字段 作用
ep 事件管理器fd
struct epoll_event 这个事件管理器可以处理的事件类型(读、写、hup信号)
int nevent 最大取多少个事件
event_cb_t cb 事件处理回调函数

twemproxy中大量使用事件回调来驱动数据流转,上面的event_cb_t就是一个回调函数

typedef int (*event_cb_t)(void *, uint32_t);

核心数据结构和算法

array 变长数组

struct array {
    uint32_t nelem;  /* # element */
    void     *elem;  /* element */
    size_t   size;   /* element size */
    uint32_t nalloc; /* # allocated element */
};

通过调整nelem和*elem指针内容实现动态数组

string 变长字符串

struct string {
    uint32_t len;   /* string length */
    uint8_t  *data; /* string data */
};

红黑树

struct rbnode {
    struct rbnode *left;     /* left link */
    struct rbnode *right;    /* right link */
    struct rbnode *parent;   /* parent link */
    int64_t       key;       /* key for ordering */
    void          *data;     /* opaque data */
    uint8_t       color;     /* red | black */
};

一致性hash

参考:src/hashkit/nc_ketama.c

队列

twemproxy使用的队列封装支持数种不同类型的队列

参考:src/nc_queue.h,这个队列文件来自freebsd linx内核:https://github.com/freebsd/freebsd/blob/master/sys/sys/queue.h

配置文件解析

twemproxy使用yaml格式的配置文件,使用libyaml库解析这个yaml格式的配置文件,参考:https://github.com/yaml/libyaml

twemproxy配置文件解析逻辑参考:src/nc_conf.h src/nc_conf.c

协议抽象接口

twemproxy支持memcache, redis两种协议,抽象成接口可以把这个协议从twemproxy主流程中解耦、这样也利于添加新的协议支持

memcached

void memcache_parse_req(struct msg *r);
void memcache_parse_rsp(struct msg *r);
bool memcache_failure(struct msg *r);
void memcache_pre_coalesce(struct msg *r);
void memcache_post_coalesce(struct msg *r);
rstatus_t memcache_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t memcache_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t memcache_reply(struct msg *r);
void memcache_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void memcache_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);

redis

void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
bool redis_failure(struct msg *r);
void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);

可以看到redis和memcached协议被统一封装成了两组对等的接口,对应实现参考:nc_memcache.c nc_redis.c

监控系统

twemproxy使用单独的线程处理监控数据获取请求,参考src/nc_stats.c::stats_start_aggregator

static rstatus_t
stats_start_aggregator(struct stats *st)
{
    rstatus_t status;
    // error handle...
    status = stats_listen(st);
    // error handle...

    status = pthread_create(&st->tid, NULL, stats_loop, st);
    // error handle...
    return NC_OK;
}

这个监控线程和主线程间使用结构体共享监控数据,主线程往这个结构体中写入实时监控数据,监控线程接到客户端获取监控数据的请求后,从这个结构体中读取监控数据输出给客户端,核心结构体参考:src/nc_stats.h

typedef enum stats_type {
    // ...
} stats_type_t;

struct stats_metric {
    // ...
};

struct stats_server {
    // ...
};

struct stats_pool {
    // ...
};

struct stats_buffer {
    // ...
};

struct stats {
    // ...
};

使用命令curl -s 127.0.0.1:22222 | jq .可以查看实时的监控信息

信号处理

参考:src/nc_signal.h src/nc_signal.c

static struct signal signals[] = {
    { SIGUSR1, "SIGUSR1", 0,                 signal_handler },
    { SIGUSR2, "SIGUSR2", 0,                 signal_handler },
    { SIGTTIN, "SIGTTIN", 0,                 signal_handler },
    { SIGTTOU, "SIGTTOU", 0,                 signal_handler },
    { SIGHUP,  "SIGHUP",  0,                 signal_handler },
    { SIGINT,  "SIGINT",  0,                 signal_handler },
    { SIGSEGV, "SIGSEGV", (int)SA_RESETHAND, signal_handler },
    { SIGPIPE, "SIGPIPE", 0,                 SIG_IGN },
    { 0,        NULL,     0,                 NULL }
};

可以看到twemproxy对多个信号进行回调:signal_handler

void
signal_handler(int signo)
{
    //...

    switch (signo) {
    case SIGUSR1:
        break;

    case SIGUSR2:
        break;

    case SIGTTIN:
        actionstr = ", up logging level";
        action = log_level_up;
        break;

    case SIGTTOU:
        actionstr = ", down logging level";
        action = log_level_down;
        break;

    case SIGHUP:
        actionstr = ", reopening log file";
        action = log_reopen;
        break;

    case SIGINT:
        done = true;
        actionstr = ", exiting";
        break;

    case SIGSEGV:
        log_stacktrace();
        actionstr = ", core dumping";
        raise(SIGSEGV);
        break;

    default:
        NOT_REACHED();
    }

    log_safe("signal %d (%s) received%s", signo, sig->signame, actionstr);

    if (action != NULL) {
        action();
    }

    if (done) {
        exit(1);
    }
}

可以看到使用SIGTTIN/信号增加日志输出级别,SIGTTOU降低日志输入级别, SIGHUP重新打开日志文件,SIGINT退出程序,SIGSEGV输出内存core dump文件

一些注意的点

本文基于twemproxy 0.4.1写成

尽管codis目前已经成为redis中间件的事实选择,但是twemproxy作为第一款memcache&redis中间件,仍具有比较大学习价值,并且支持memcache

参考资料

  1. http://man7.org/linux/man-pages/man2/epoll_wait.2.html
  2. https://blog.just4fun.site/command-tool-jq.html
  3. https://unix.stackexchange.com/questions/196549/hide-curl-output/362760
  4. https://www.jianshu.com/p/83ac498e1b2c