欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

redis源代码分析19–主从复制

程序员文章站 2022-05-10 23:26:02
...

先说下Redis主从复制的特点。 官方文档ReplicationHowto中提到以下特点: 1. 一个master支持多个slave 2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构 3. 复制在master端是非阻塞的,也就是master在向client

先说下Redis主从复制的特点。

官方文档ReplicationHowto中提到以下特点:
1. 一个master支持多个slave
2. slave可以接受其他slave的连接,作为其他slave的master,从而形成一个master-slave的多级结构
3. 复制在master端是非阻塞的,也就是master在向client复制时可处理其他client的命令,而slave在第一次同步时是阻塞的
4. 复制被利用来提供可扩展性,比如可以将slave端用作数据冗余,也可以将耗时的命令(比如sort)发往某些slave从而避免master的阻塞,另外也可以用slave做持久化,这只需要将master的配置文件中的save指令注释掉。

client可以在一开始时作为slave连接master,也可以在运行后发布sync命令,从而跟master建立主从关系。

接下来我们分别从slave和master的视角概述下redis的主从复制的运行机制。

如果redis作为slave运行,则全局变量server.replstate的状态有REDIS_REPL_NONE(不处于复制状态)、 REDIS_REPL_CONNECT(需要跟master建立连接)、REDIS_REPL_CONNECTED(已跟master建立连接)三种。在读入slaveof配置或者发布slaveof命令后,server.replstate取值为REDIS_REPL_CONNECT,然后在syncWithMaster跟master执行第一次同步后,取值变为REDIS_REPL_CONNECTED。

如果redis作为master运行,则对应某个客户端连接的变量slave.replstate的状态有REDIS_REPL_WAIT_BGSAVE_START(等待bgsave运行)、REDIS_REPL_WAIT_BGSAVE_END(bgsave已dump db,该bulk传输了)、REDIS_REPL_SEND_BULK(正在bulk传输)、REDIS_REPL_ONLINE(已完成开始的bulk传输,以后只需发送更新了)。对于slave客户端(发布sync命令),一开始slave.replstate都处于REDIS_REPL_WAIT_BGSAVE_START状态(后面详解syncCommand函数),然后在后台dump db后(backgroundSaveDoneHandler函数),处于REDIS_REPL_WAIT_BGSAVE_END 状态,然后updateSlavesWaitingBgsave会将状态置为REDIS_REPL_SEND_BULK,并设置write事件的函数 sendBulkToSlave,在sendBulkToSlave运行后,状态就变为REDIS_REPL_ONLINE了,此后master会一直调用replicationFeedSlaves给处于REDIS_REPL_ONLINE状态的slave发送新命令。

我们先看处于master端的redis会执行的代码。

slave端都是通过发布sync命令来跟master同步的,sync命令的处理函数syncCommand如下所示。

该函数中的注释足够明了。如果slave的client设置了REDIS_SLAVE标志,说明master已用syncCommand处理了该 slave。如果master还有对这个client的reply没有发送,则返回出错信息。此后若server.bgsavechildpid != -1且有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,则说明dump db的后台进程刚结束,此时新的slave可直接用保存的rdb进行bulk传输(注意复制reply参数,因为master是非阻塞的,此时可能执行了一些命令,call函数会调用replicationFeedSlaves函数将命令参数保存到slave的reply参数中)。如果没有slave处于REDIS_REPL_WAIT_BGSAVE_END状态,但server.bgsavechildpid != -1,则说明bgsave后台进程没有运行完,需要等待其结束(bgsave后台进程结束后会处理等待的slave)。如果server.bgsavechildpid 等于 -1,则需要启动一个后台进程来dump db了。最后将当前client加到master的slaves链表中。

static void syncCommand(redisClient *c) {
    /* ignore SYNC if aleady slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;
    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0) {
        addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
        return;
    }
    redisLog(REDIS_NOTICE,"Slave ask for synchronization");
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.bgsavechildpid != -1) {
        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
        if (ln) {
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
            listRelease(c->reply);
            c->reply = listDup(slave->reply);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    }
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
    return;
}

此后slave无论处于REDIS_REPL_WAIT_BGSAVE_START还是REDIS_REPL_WAIT_BGSAVE_END,都只能等 dump db的后台进程运行结束后才会被处理。该进程结束后会执行backgroundSaveDoneHandler函数,而该函数调用 updateSlavesWaitingBgsave来处理slaves。

updateSlavesWaitingBgsave和syncCommand一样,涉及到slave的几个状态变换。对于等待dump db的slave,master都会将其放入server.slaves 链表中。此时,若slave->replstate == REDIS_REPL_WAIT_BGSAVE_START,说明当前dump db不是该slave需要的,redis需要重新启动后台进程来dump db。若slave->replstate == REDIS_REPL_WAIT_BGSAVE_END,则说明当前dump db正是该slave所需要的,此时设置slave的write事件的处理函数sendBulkToSlave。

static void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            struct redis_stat buf;
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
            if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            listIter li;
            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

sendBulkToSlave 的逻辑不复杂。它根据slave->repldbfd指向的db,先从dump后的rdb文件中读入db数据,然后发送。发送完后会删除write 事件,设置slave->replstate状态为REDIS_REPL_ONLINE,此后master就会在收到命令后调用call函数,然后使用replicationFeedSlaves同步更新该slave了。replicationFeedSlaves也是遍历slave链表,对处于REDIS_REPL_ONLINE状态的slave,发送当前命令及其参数。

 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *slave = privdata;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    char buf[REDIS_IOBUF_LEN];
    ssize_t nwritten, buflen;
    if (slave->repldboff == 0) {
        /* Write the bulk write count before to transfer the DB. In theory here
         * we don't know how much room there is in the output buffer of the
         * socket, but in pratice SO_SNDLOWAT (the minimum count for output
         * operations) will never be smaller than the few bytes we need. */
        sds bulkcount;
        bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
            slave->repldbsize);
        if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
        {
            sdsfree(bulkcount);
            freeClient(slave);
            return;
        }
        sdsfree(bulkcount);
    }
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
    buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
    if (buflen repldboff += nwritten;
    if (slave->repldboff == slave->repldbsize) {
        close(slave->repldbfd);
        slave->repldbfd = -1;
        aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
        slave->replstate = REDIS_REPL_ONLINE;
        if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
            sendReplyToClient, slave) == AE_ERR) {
            freeClient(slave);
            return;
        }
        addReplySds(slave,sdsempty());
        redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
    }
}

接下来我们看看redis作为slave是如何运行的。

redis 作为slave(当然也可以使用普通的client作为slave端,这样则跟具体client的实现有关了)时,需要在配置文件中指明master的位置,在loadServerConfig读取配置参数时,会将server.replstate设置为REDIS_REPL_CONNECT状态。处于此状态的redis需要运行到serverCron后才能使用syncWithMaster来和master进行初始同步。查看syncWithMaster的代码可知,其实也向master发布sync命令来建立主从关系的,另外,该函数接收、发送数据时使用的是syncRead、syncWrite函数,而这些函数是阻塞的,因此,redis作为slave运行时,建立最初的主从关系时也是阻塞的。

 /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (syncWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER  SLAVE sync succeeded");
            if (server.appendonly) rewriteAppendOnlyFileBackground();
        }
    }

另外跟主从复制有关的一个命令就是slaveof命令。此命令是redis主从状态的转换函数,通过前面的分析可知,这只需要更改几个状态即可。

static void slaveofCommand(redisClient *c) {
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        if (server.masterhost) {
            sdsfree(server.masterhost);
            server.masterhost = NULL;
            if (server.master) freeClient(server.master);
            server.replstate = REDIS_REPL_NONE;
            redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
        }
    } else {
        sdsfree(server.masterhost);
        server.masterhost = sdsdup(c->argv[1]->ptr);
        server.masterport = atoi(c->argv[2]->ptr);
        if (server.master) freeClient(server.master);
        server.replstate = REDIS_REPL_CONNECT;
        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
            server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}