Gerrad Zhang

Redis集群架构与高可用方案深度解析

深入探讨Redis Cluster分布式架构、Redis Sentinel哨兵模式和主从复制机制。掌握高可用部署策略、故障转移原理和集群扩容技术,构建生产级Redis高可用方案。

Gerrad Zhang
武汉,中国
3 min read

🤔 问题背景与技术演进

我们要解决什么问题?

单机Redis在生产环境中面临多重挑战:

  • 单点故障风险:Redis服务器宕机导致整个缓存系统不可用
  • 内存容量限制:单机内存有限,无法存储海量数据
  • 性能瓶颈:单机QPS有上限,无法支撑高并发访问
  • 数据安全性:内存数据易丢失,需要可靠的备份机制
  • 扩展性差:业务增长时难以动态扩容
// 单机Redis的问题示例
public class SingleRedisProblems {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void demonstrateSinglePointFailure() {
        try {
            // 大量业务依赖Redis
            String userSession = (String) redisTemplate.opsForValue().get("session:user123");
            List<String> shoppingCart = redisTemplate.opsForList().range("cart:user123", 0, -1);
            String userPreferences = (String) redisTemplate.opsForValue().get("prefs:user123");
            
            // 如果Redis服务器突然宕机...
            // 所有依赖Redis的功能都会失效:
            // - 用户需要重新登录
            // - 购物车数据丢失
            // - 个性化推荐失效
            // - 系统响应时间急剧增加
            
        } catch (Exception e) {
            log.error("Redis连接失败,系统功能受影响", e);
            // 业务降级处理,但用户体验严重受损
        }
    }
    
    public void demonstrateCapacityLimitation() {
        /*
         * 内存容量问题:
         * - 单机内存:128GB
         * - 业务数据:500GB
         * - 结果:无法全部缓存,缓存命中率低
         * 
         * 性能瓶颈:
         * - 单机QPS:10万
         * - 业务需求:50万QPS
         * - 结果:响应时间增加,用户体验差
         */
    }
}

没有这个技术时是怎么做的?

在Redis集群技术出现之前,开发者主要通过以下方式解决高可用问题:

1. 主从复制 + 手动切换

# 手动主从切换
redis-cli -h slave-server SLAVEOF NO ONE
# 应用程序手动切换连接地址
  • 问题:需要人工干预,切换时间长,容易出错

2. 客户端分片

// 客户端一致性哈希分片
public class ClientSharding {
    private List<JedisPool> redisPools;
    
    public Jedis getRedisConnection(String key) {
        int hash = key.hashCode();
        int index = Math.abs(hash) % redisPools.size();
        return redisPools.get(index).getResource();
    }
}
  • 问题:扩容困难,数据迁移复杂,客户端逻辑复杂

3. 代理层方案

  • 使用Twemproxy、Codis等代理
  • 问题:增加网络延迟,代理成为新的单点

4. 应用层多写

// 多个Redis实例同时写入
public void writeToMultipleRedis(String key, String value) {
    redis1.set(key, value);
    redis2.set(key, value);
    redis3.set(key, value);
}
  • 问题:数据一致性难以保证,性能开销大

技术演进的历史脉络

2010年: Redis 2.0引入主从复制

  • 支持一主多从架构
  • 数据自动同步到从节点
  • 手动故障切换

2012年: Redis 2.6引入Sentinel哨兵

  • 自动故障检测和切换
  • 主从切换自动化
  • 配置管理和通知

2015年: Redis 3.0引入Cluster集群

  • 原生分布式支持
  • 自动数据分片(16384个slot)
  • 无中心化架构

2016年: Redis 3.2集群功能完善

  • 集群重平衡优化
  • 故障转移改进
  • 管理工具完善

2018年: Redis 5.0集群增强

  • Stream数据类型集群支持
  • 集群管理命令改进
  • 性能优化

2020年至今: 云原生集群

  • Kubernetes集群部署
  • 容器化高可用方案
  • 多云架构支持

🎯 核心概念与原理

Redis主从复制机制

主从复制是Redis高可用的基础,实现数据备份和读写分离:

/**
 * Redis主从复制架构分析
 */
public class RedisMasterSlaveAnalysis {
    
    /**
     * 主从复制原理
     */
    public void analyzeMasterSlaveReplication() {
        /*
         * 复制过程:
         * 1. 从节点连接主节点
         * 2. 从节点发送PSYNC命令请求同步
         * 3. 主节点执行BGSAVE生成RDB快照
         * 4. 主节点将RDB文件发送给从节点
         * 5. 从节点加载RDB文件恢复数据
         * 6. 主节点将缓冲区的写命令发送给从节点
         * 7. 后续写命令实时同步
         * 
         * 同步类型:
         * - 全量同步:首次连接或长时间断开后重连
         * - 增量同步:正常运行时的实时同步
         * - 部分同步:短暂断开后的快速恢复
         */
    }
    
    /**
     * 主从复制配置
     */
    public void configureMasterSlave() {
        /*
         * 主节点配置(redis-master.conf):
         * bind 0.0.0.0
         * port 6379
         * requirepass master_password
         * 
         * # 主从复制安全
         * masterauth master_password
         * 
         * # 复制相关配置
         * repl-diskless-sync no          # 使用磁盘复制
         * repl-diskless-sync-delay 5     # 延迟5秒开始复制
         * 
         * 从节点配置(redis-slave.conf):
         * bind 0.0.0.0
         * port 6380
         * 
         * # 指定主节点
         * replicaof 192.168.1.100 6379
         * masterauth master_password
         * 
         * # 从节点只读
         * replica-read-only yes
         * 
         * # 复制超时配置
         * repl-timeout 60
         */
    }
    
    /**
     * 读写分离实现
     */
    public void implementReadWriteSeparation() {
        /*
         * Spring Boot配置读写分离:
         */
    }
}

@Configuration
public class RedisReadWriteConfig {
    
    @Bean
    @Primary
    public LettuceConnectionFactory masterConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName("redis-master.example.com");
        config.setPort(6379);
        config.setPassword("master_password");
        return new LettuceConnectionFactory(config);
    }
    
    @Bean
    public LettuceConnectionFactory slaveConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName("redis-slave.example.com");
        config.setPort(6380);
        config.setPassword("master_password");
        return new LettuceConnectionFactory(config);
    }
    
    @Bean
    public RedisTemplate<String, Object> masterRedisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(masterConnectionFactory());
        // 序列化配置...
        return template;
    }
    
    @Bean
    public RedisTemplate<String, Object> slaveRedisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(slaveConnectionFactory());
        // 序列化配置...
        return template;
    }
}

Redis Sentinel哨兵模式

Sentinel提供自动故障检测和切换功能:

/**
 * Redis Sentinel架构分析
 */
public class RedisSentinelAnalysis {
    
    /**
     * Sentinel工作原理
     */
    public void analyzeSentinelMechanism() {
        /*
         * Sentinel功能:
         * 1. 监控:持续监控主从节点健康状态
         * 2. 通知:向管理员发送故障通知
         * 3. 故障转移:自动将从节点提升为主节点
         * 4. 配置管理:为客户端提供当前主节点信息
         * 
         * 工作流程:
         * 1. Sentinel定期向主从节点发送PING命令
         * 2. 如果主节点在指定时间内未响应,标记为主观下线
         * 3. 询问其他Sentinel,确认主节点客观下线
         * 4. 选举Leader Sentinel执行故障转移
         * 5. 选择合适的从节点提升为新主节点
         * 6. 更新其他从节点的复制目标
         * 7. 通知客户端新的主节点信息
         */
    }
    
    /**
     * Sentinel配置
     */
    public void configureSentinel() {
        /*
         * sentinel.conf配置:
         * 
         * # Sentinel端口
         * port 26379
         * 
         * # 监控主节点
         * sentinel monitor mymaster 192.168.1.100 6379 2
         * 
         * # 认证密码
         * sentinel auth-pass mymaster master_password
         * 
         * # 主观下线时间(毫秒)
         * sentinel down-after-milliseconds mymaster 30000
         * 
         * # 故障转移超时时间
         * sentinel failover-timeout mymaster 180000
         * 
         * # 同时进行复制的从节点数量
         * sentinel parallel-syncs mymaster 1
         * 
         * # 通知脚本
         * sentinel notification-script mymaster /var/redis/notify.sh
         * 
         * # 客户端重新配置脚本
         * sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
         */
    }
}

Redis Cluster集群架构

Redis Cluster提供原生的分布式解决方案:

/**
 * Redis Cluster架构分析
 */
public class RedisClusterAnalysis {
    
    /**
     * Cluster分片原理
     */
    public void analyzeClusterSharding() {
        /*
         * 数据分片机制:
         * 1. 16384个哈希槽(Hash Slot)
         * 2. 每个Key通过CRC16算法计算槽位
         * 3. 每个节点负责一部分槽位
         * 4. 客户端根据槽位路由请求
         * 
         * 槽位计算:
         * slot = CRC16(key) % 16384
         * 
         * 集群拓扑:
         * - 最少3个主节点
         * - 每个主节点可配置多个从节点
         * - 节点间通过Gossip协议通信
         * - 无中心化架构,任意节点可接收请求
         */
    }
    
    /**
     * 故障转移机制
     */
    public void analyzeFailoverMechanism() {
        /*
         * 故障检测:
         * 1. 节点间定期发送PING消息
         * 2. 超时未响应标记为PFAIL(可能失败)
         * 3. 多数节点确认后标记为FAIL(确认失败)
         * 
         * 自动故障转移:
         * 1. 从节点检测到主节点失败
         * 2. 从节点发起选举成为新主节点
         * 3. 新主节点接管原主节点的槽位
         * 4. 更新集群配置并广播
         * 
         * 选举算法:
         * - 基于Raft算法的简化版本
         * - 复制偏移量大的从节点优先
         * - 节点ID小的从节点优先(tie-breaker)
         */
    }
    
    /**
     * 集群扩容机制
     */
    public void analyzeClusterScaling() {
        /*
         * 扩容流程:
         * 1. 添加新节点到集群
         * 2. 分配槽位给新节点
         * 3. 迁移数据到新节点
         * 4. 更新集群拓扑
         * 
         * 数据迁移:
         * - 逐个槽位迁移
         * - 源节点标记槽位为迁移中
         * - 逐个Key迁移到目标节点
         * - 迁移完成后更新槽位归属
         * 
         * 缩容流程:
         * 1. 将节点的槽位迁移到其他节点
         * 2. 确认节点无槽位后移除
         * 3. 更新集群配置
         */
    }
}

🔧 实现原理与源码分析

主从复制源码实现

主从复制的核心实现位于replication.c文件中

// Redis主从复制核心代码分析
/**
 * PSYNC命令实现 - 部分重同步
 */
void syncCommand(client *c) {
    /* PSYNC命令格式: PSYNC <repl_id> <offset> */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        if (c->argc != 3) {
            addReplyError(c, "Wrong number of arguments for PSYNC");
            return;
        }
        
        // 获取复制ID和偏移量
        char *master_replid = c->argv[1]->ptr;
        char *offset = c->argv[2]->ptr;
        long long psync_offset;
        
        if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK)
            return;
        
        // 判断是否可以进行部分重同步
        if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {
            server.stat_sync_partial_ok++;
            return; /* 部分重同步成功 */
        } else {
            server.stat_sync_full++;
            /* 执行全量重同步 */
        }
    }
    
    // 全量重同步流程
    c->flags |= CLIENT_SLAVE;
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
    
    // 如果没有正在进行的BGSAVE,启动新的BGSAVE
    if (server.rdb_child_pid == -1) {
        if (startBgsaveForReplication(c->slave_req) != C_OK) {
            addReplyError(c, "Unable to perform background save");
            return;
        }
    }
    
    // 将客户端添加到等待列表
    listAddNodeTail(server.slaves, c);
}

/**
 * 主节点向从节点发送RDB文件
 */
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
    listNode *ln;
    listIter li;
    
    listRewind(server.slaves, &li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;
        
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
            slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
        } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
            struct stat buf;
            
            if (bgsaveerr != C_OK) {
                freeClient(slave);
                continue;
            }
            
            // 打开RDB文件准备发送
            if ((slave->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 ||
                fstat(slave->repldbfd, &buf) == -1) {
                freeClient(slave);
                continue;
            }
            
            // 记录RDB文件大小和发送进度
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = SLAVE_STATE_SEND_BULK;
            slave->replpreamble = sdscatprintf(sdsempty(),
                "$%lld\r\n", (unsigned long long) slave->repldbsize);
            
            // 注册写事件,准备发送RDB数据
            aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave);
        }
    }
}

/**
 * 发送RDB数据到从节点
 */
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *slave = privdata;
    char buf[PROTO_IOBUF_LEN];
    ssize_t nwritten, buflen;
    
    // 先发送RDB文件大小信息
    if (slave->replpreamble) {
        nwritten = connWrite(slave->conn, slave->replpreamble, sdslen(slave->replpreamble));
        if (nwritten == -1) {
            freeClient(slave);
            return;
        }
        slave->replpreamble = sdsrange(slave->replpreamble, nwritten, -1);
        if (sdslen(slave->replpreamble) == 0) {
            sdsfree(slave->replpreamble);
            slave->replpreamble = NULL;
        } else {
            return;
        }
    }
    
    // 发送RDB文件内容
    lseek(slave->repldbfd, slave->repldboff, SEEK_SET);
    buflen = read(slave->repldbfd, buf, PROTO_IOBUF_LEN);
    if (buflen <= 0) {
        if (buflen == 0) {
            // RDB文件发送完成
            close(slave->repldbfd);
            slave->repldbfd = -1;
            aeDeleteFileEvent(server.el, slave->fd, AE_WRITABLE);
            slave->replstate = SLAVE_STATE_ONLINE;
            
            // 发送缓冲区中的写命令
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
                sendReplyToClient, slave) == AE_ERR) {
                freeClient(slave);
                return;
            }
            slave->repldboff = 0;
        } else {
            freeClient(slave);
            return;
        }
        return;
    }
    
    // 写入网络缓冲区
    if ((nwritten = connWrite(slave->conn, buf, buflen)) == -1) {
        freeClient(slave);
        return;
    }
    slave->repldboff += nwritten;
    
    // 更新网络统计
    server.stat_net_output_bytes += nwritten;
}

/**
 * 实时同步写命令到从节点
 */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[LONG_STR_SIZE];
    
    // 如果没有从节点,直接返回
    if (listLength(slaves) == 0) return;
    
    // 构造Redis协议格式的命令
    robj **outv;
    int outc = 0, j;
    
    // 如果需要切换数据库
    if (server.slaveseldb != dictid) {
        robj *selectcmd;
        
        if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        } else {
            int dictid_len = ll2string(llstr, sizeof(llstr), dictid);
            selectcmd = createObject(OBJ_STRING,
                sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                    dictid_len, llstr));
        }
        
        // 发送SELECT命令到所有从节点
        listRewind(slaves, &li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
            addReply(slave, selectcmd);
        }
        
        if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }
    server.slaveseldb = dictid;
    
    // 发送实际的写命令
    listRewind(slaves, &li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;
        
        // 只向在线的从节点发送命令
        if (slave->replstate != SLAVE_STATE_ONLINE) continue;
        
        // 添加命令到从节点的输出缓冲区
        addReplyArrayLen(slave, argc);
        for (j = 0; j < argc; j++) {
            addReplyBulk(slave, argv[j]);
        }
    }
}

Sentinel哨兵源码实现

Sentinel的核心实现位于sentinel.c文件中

// Redis Sentinel核心代码分析
/**
 * Sentinel主循环 - 监控和故障检测
 */
void sentinelTimer(void) {
    sentinelCheckTiltCondition();
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();
    
    // 处理重新配置状态
    sentinelHandleRedisInstance(sentinel.masters);
}

/**
 * 检查Redis实例状态
 */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    /* 发送PING命令检查实例状态 */
    sentinelSendPeriodicCommands(ri);
    
    /* 检查主观下线状态 */
    sentinelCheckSubjectivelyDown(ri);
    
    /* 检查客观下线状态(仅对主节点) */
    if (ri->flags & SRI_MASTER) {
        sentinelCheckObjectivelyDown(ri);
        
        /* 如果主节点客观下线,开始故障转移 */
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_ASK_FORCED);
        
        /* 执行故障转移步骤 */
        sentinelFailoverStateMachine(ri);
        
        /* 向其他Sentinel询问主节点状态 */
        sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_NO_FLAGS);
    }
}

/**
 * 检查主观下线状态
 */
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;
    
    if (ri->link->last_pong_time)
        elapsed = mstime() - ri->link->last_pong_time;
    else if (ri->link->last_ping_time)
        elapsed = mstime() - ri->link->last_ping_time;
    
    /* 如果超过down-after-milliseconds时间没有响应 */
    if (elapsed > ri->down_after_period) {
        /* 标记为主观下线 */
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(LL_WARNING, "+sdown", ri, "%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        /* 恢复正常状态 */
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(LL_WARNING, "-sdown", ri, "%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}

/**
 * 检查客观下线状态
 */
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;
    
    /* 主节点必须先主观下线 */
    if (master->flags & SRI_S_DOWN) {
        quorum = 1; /* 当前Sentinel认为下线 */
        
        /* 统计其他Sentinel的投票 */
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        
        /* 如果达到quorum数量,标记为客观下线 */
        if (quorum >= master->quorum) odown = 1;
    }
    
    /* 更新客观下线状态 */
    if (odown) {
        if ((master->flags & SRI_O_DOWN) == 0) {
            sentinelEvent(LL_WARNING, "+odown", master, "%@ #quorum %d/%d",
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(LL_WARNING, "-odown", master, "%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

/**
 * 故障转移状态机
 */
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
    
    switch(ri->failover_state) {
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

/**
 * 选择最佳从节点进行提升
 */
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
    sentinelRedisInstance **instance =
        zmalloc(sizeof(instance[0]) * dictSize(master->slaves));
    sentinelRedisInstance *selected = NULL;
    int instances = 0;
    dictIterator *di;
    dictEntry *de;
    mstime_t max_master_down_time = 0;
    
    /* 收集可用的从节点 */
    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        mstime_t info_validity_time;
        
        /* 跳过下线的从节点 */
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
        
        /* 跳过断开连接的从节点 */
        if (slave->link->disconnected) continue;
        
        /* 跳过复制延迟过大的从节点 */
        if (mstime() - slave->slave_repl_offset_time > SENTINEL_INFO_PERIOD*5)
            continue;
        
        /* 跳过优先级为0的从节点(不参与选举) */
        if (slave->slave_priority == 0) continue;
        
        instance[instances++] = slave;
    }
    dictReleaseIterator(di);
    
    /* 根据优先级、复制偏移量、运行ID排序 */
    if (instances) {
        qsort(instance, instances, sizeof(sentinelRedisInstance*),
              compareSlavesForPromotion);
        selected = instance[0];
    }
    zfree(instance);
    return selected;
}

Cluster集群源码实现

Redis Cluster的核心实现位于cluster.c文件中

// Redis Cluster核心代码分析
/**
 * 集群槽位分配和路由
 */
int clusterProcessCommand(client *c) {
    /* 如果不是集群模式,正常处理命令 */
    if (!(server.cluster_enabled)) return 0;
    
    /* 多键命令的槽位检查 */
    if (server.cluster->state != CLUSTER_OK) {
        if (!(c->cmd->flags & CMD_ASKING)) {
            addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
            return 1;
        }
    }
    
    /* 计算命令涉及的槽位 */
    int hashslot;
    int error_code;
    clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
    
    if (n == NULL || n != server.cluster->myself) {
        if (c->cmd->flags & CMD_ASKING) {
            clusterRedirectClient(c, n, hashslot, CLUSTER_REDIR_ASK);
        } else {
            clusterRedirectClient(c, n, hashslot, CLUSTER_REDIR_MOVED);
        }
        return 1;
    }
    
    /* 在当前节点执行命令 */
    return 0;
}

/**
 * 根据Key计算槽位
 */
unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */
    
    /* 查找哈希标签 {...} */
    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;
    
    /* 没有找到 { ,使用整个key */
    if (s == keylen) return crc16(key, keylen) & 0x3FFF;
    
    /* 查找匹配的 } */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;
    
    /* 没有找到 } 或者 {} 为空,使用整个key */
    if (e == keylen || e == s+1) return crc16(key, keylen) & 0x3FFF;
    
    /* 使用 {tag} 中的内容计算槽位 */
    return crc16(key+s+1, e-s-1) & 0x3FFF;
}

/**
 * 集群故障检测
 */
void clusterCron(void) {
    dictIterator *di;
    dictEntry *de;
    int update_state = 0;
    int orphaned_masters; /* Masters with no working slaves. */
    int max_slaves; /* Max number of slaves for a single master. */
    int this_slaves; /* Number of slaves for our master (if we are slave). */
    mstime_t min_pong = 0, now = mstime();
    clusterNode *min_pong_node = NULL;
    
    /* 遍历所有已知节点 */
    di = dictGetIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        
        /* 跳过自己和握手中的节点 */
        if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) continue;
        
        /* 检查节点是否超时 */
        if (!(node->flags & CLUSTER_NODE_NOADDR) &&
            node->pong_received &&
            (now - node->pong_received) > server.cluster_node_timeout)
        {
            /* 标记节点为可能失败 */
            node->flags |= CLUSTER_NODE_PFAIL;
            update_state = 1;
        }
    }
    dictReleaseIterator(di);
    
    /* 检查是否有节点被标记为FAIL */
    di = dictGetIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        int needed_quorum = (server.cluster->size / 2) + 1;
        
        if (node->flags & CLUSTER_NODE_PFAIL) {
            int failure_reports = clusterNodeFailureReportsCount(node);
            
            /* 如果故障报告数量达到quorum,标记为FAIL */
            if (failure_reports >= needed_quorum) {
                node->flags &= ~CLUSTER_NODE_PFAIL;
                node->flags |= CLUSTER_NODE_FAIL;
                node->fail_time = mstime();
                
                /* 广播FAIL消息 */
                clusterSendFail(node->name);
                clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
                                   CLUSTER_TODO_SAVE_CONFIG);
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);
    
    /* 如果我们是从节点,检查是否需要进行故障转移 */
    if (nodeIsSlave(server.cluster->myself) &&
        server.cluster->myself->slaveof &&
        nodeTimedOut(server.cluster->myself->slaveof))
    {
        clusterHandleSlaveFailover();
    }
    
    /* 更新集群状态 */
    if (update_state || server.cluster->state == CLUSTER_FAIL)
        clusterUpdateState();
}

/**
 * 从节点故障转移处理
 */
void clusterHandleSlaveFailover(void) {
    mstime_t data_age;
    mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
    int needed_quorum = (server.cluster->size / 2) + 1;
    int manual_failover = server.cluster->mf_end != 0 &&
                          server.cluster->mf_can_start;
    mstime_t auth_timeout, auth_retry_time;
    
    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
    
    /* 计算数据年龄 */
    data_age = (server.unixtime - server.cluster->myself->slaveof->ping_sent) * 1000;
    
    /* 如果数据太旧,不进行故障转移 */
    if (data_age > server.cluster_node_timeout * CLUSTER_SLAVE_VALIDITY_MULT)
        return;
    
    /* 如果还没有请求投票权限,发起投票 */
    if (server.cluster->failover_auth_sent == 0) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        server.cluster->failover_auth_sent = 1;
        server.cluster->failover_auth_time = mstime() +
            500 + random() % 500; /* 添加随机延迟避免冲突 */
        
        /* 向所有主节点请求投票 */
        clusterRequestFailoverAuth();
        return;
    }
    
    /* 检查是否获得足够的投票 */
    if (server.cluster->failover_auth_count >= needed_quorum) {
        /* 获得足够投票,开始故障转移 */
        clusterFailoverReplaceYourMaster();
    }
}

💡 实战案例与代码示例

主从复制实战部署

完整的主从复制环境搭建

/**
 * Redis主从复制完整实现
 */
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisReplicationConfig {
    
    @Value("${redis.master.host:localhost}")
    private String masterHost;
    
    @Value("${redis.master.port:6379}")
    private int masterPort;
    
    @Value("${redis.slave.host:localhost}")
    private String slaveHost;
    
    @Value("${redis.slave.port:6380}")
    private int slavePort;
    
    @Value("${redis.password}")
    private String password;
    
    /**
     * 主节点连接工厂
     */
    @Bean
    @Primary
    public LettuceConnectionFactory masterConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName(masterHost);
        config.setPort(masterPort);
        config.setPassword(password);
        
        // 连接池配置
        GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = 
            new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(20);
        poolConfig.setMaxIdle(10);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);
        
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .commandTimeout(Duration.ofSeconds(5))
            .shutdownTimeout(Duration.ofSeconds(10))
            .build();
        
        return new LettuceConnectionFactory(config, clientConfig);
    }
    
    /**
     * 从节点连接工厂(只读)
     */
    @Bean
    public LettuceConnectionFactory slaveConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName(slaveHost);
        config.setPort(slavePort);
        config.setPassword(password);
        
        GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = 
            new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(50); // 读操作较多,增加连接数
        poolConfig.setMaxIdle(20);
        poolConfig.setMinIdle(10);
        
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .readFrom(ReadFrom.REPLICA_PREFERRED) // 优先从从节点读取
            .commandTimeout(Duration.ofSeconds(3))
            .build();
        
        return new LettuceConnectionFactory(config, clientConfig);
    }
    
    /**
     * 主节点Redis模板(写操作)
     */
    @Bean
    public RedisTemplate<String, Object> masterRedisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(masterConnectionFactory());
        
        // 序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = 
            new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, 
            ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setDefaultSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        
        return template;
    }
    
    /**
     * 从节点Redis模板(读操作)
     */
    @Bean
    public RedisTemplate<String, Object> slaveRedisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(slaveConnectionFactory());
        
        // 使用相同的序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = 
            new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, 
            ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setDefaultSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        
        return template;
    }
}

/**
 * 读写分离服务实现
 */
@Service
public class RedisReadWriteService {
    
    @Autowired
    @Qualifier("masterRedisTemplate")
    private RedisTemplate<String, Object> masterRedisTemplate;
    
    @Autowired
    @Qualifier("slaveRedisTemplate")
    private RedisTemplate<String, Object> slaveRedisTemplate;
    
    private static final Logger logger = LoggerFactory.getLogger(RedisReadWriteService.class);
    
    /**
     * 写操作 - 使用主节点
     */
    public void writeData(String key, Object value) {
        try {
            masterRedisTemplate.opsForValue().set(key, value, Duration.ofHours(1));
            logger.info("数据写入主节点成功: key={}", key);
        } catch (Exception e) {
            logger.error("数据写入主节点失败: key={}", key, e);
            throw new RuntimeException("Redis写入失败", e);
        }
    }
    
    /**
     * 写操作 - 批量写入
     */
    public void writeBatchData(Map<String, Object> dataMap) {
        try {
            masterRedisTemplate.opsForValue().multiSet(dataMap);
            
            // 设置过期时间
            dataMap.keySet().forEach(key -> 
                masterRedisTemplate.expire(key, Duration.ofHours(1))
            );
            
            logger.info("批量数据写入主节点成功: count={}", dataMap.size());
        } catch (Exception e) {
            logger.error("批量数据写入主节点失败", e);
            throw new RuntimeException("Redis批量写入失败", e);
        }
    }
    
    /**
     * 读操作 - 使用从节点
     */
    public Object readData(String key) {
        try {
            Object value = slaveRedisTemplate.opsForValue().get(key);
            logger.debug("从从节点读取数据: key={}, found={}", key, value != null);
            return value;
        } catch (Exception e) {
            logger.warn("从从节点读取失败,尝试从主节点读取: key={}", key);
            
            // 从节点读取失败,降级到主节点
            try {
                Object value = masterRedisTemplate.opsForValue().get(key);
                logger.info("从主节点读取数据成功: key={}", key);
                return value;
            } catch (Exception masterException) {
                logger.error("从主节点读取也失败: key={}", key, masterException);
                throw new RuntimeException("Redis读取失败", masterException);
            }
        }
    }
    
    /**
     * 读操作 - 批量读取
     */
    public List<Object> readBatchData(List<String> keys) {
        try {
            List<Object> values = slaveRedisTemplate.opsForValue().multiGet(keys);
            logger.debug("从从节点批量读取数据: count={}", keys.size());
            return values;
        } catch (Exception e) {
            logger.warn("从从节点批量读取失败,尝试从主节点读取");
            
            try {
                List<Object> values = masterRedisTemplate.opsForValue().multiGet(keys);
                logger.info("从主节点批量读取数据成功: count={}", keys.size());
                return values;
            } catch (Exception masterException) {
                logger.error("从主节点批量读取也失败", masterException);
                throw new RuntimeException("Redis批量读取失败", masterException);
            }
        }
    }
    
    /**
     * 删除操作 - 使用主节点
     */
    public void deleteData(String key) {
        try {
            Boolean deleted = masterRedisTemplate.delete(key);
            logger.info("从主节点删除数据: key={}, deleted={}", key, deleted);
        } catch (Exception e) {
            logger.error("从主节点删除数据失败: key={}", key, e);
            throw new RuntimeException("Redis删除失败", e);
        }
    }
    
    /**
     * 健康检查
     */
    public Map<String, String> healthCheck() {
        Map<String, String> status = new HashMap<>();
        
        // 检查主节点
        try {
            String masterPing = masterRedisTemplate.getConnectionFactory()
                .getConnection().ping();
            status.put("master", "PONG".equals(masterPing) ? "UP" : "DOWN");
        } catch (Exception e) {
            status.put("master", "DOWN");
            logger.error("主节点健康检查失败", e);
        }
        
        // 检查从节点
        try {
            String slavePing = slaveRedisTemplate.getConnectionFactory()
                .getConnection().ping();
            status.put("slave", "PONG".equals(slavePing) ? "UP" : "DOWN");
        } catch (Exception e) {
            status.put("slave", "DOWN");
            logger.error("从节点健康检查失败", e);
        }
        
        return status;
    }
}

Sentinel哨兵模式实战

生产级Sentinel集群配置

# application.yml - Sentinel配置
spring:
  redis:
    sentinel:
      master: mymaster
      nodes:
        - 192.168.1.101:26379
        - 192.168.1.102:26379
        - 192.168.1.103:26379
      password: ${REDIS_PASSWORD}
    password: ${REDIS_PASSWORD}
    timeout: 5000ms
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5
        max-wait: 5000ms
      shutdown-timeout: 10000ms
/**
 * Redis Sentinel配置
 */
@Configuration
public class RedisSentinelConfig {
    
    @Value("${spring.redis.sentinel.master}")
    private String masterName;
    
    @Value("${spring.redis.password}")
    private String password;
    
    @Value("#{'${spring.redis.sentinel.nodes}'.split(',')}")
    private List<String> sentinelNodes;
    
    /**
     * Sentinel连接工厂
     */
    @Bean
    public LettuceConnectionFactory sentinelConnectionFactory() {
        // 解析Sentinel节点
        Set<RedisNode> sentinelSet = sentinelNodes.stream()
            .map(node -> {
                String[] parts = node.split(":");
                return new RedisNode(parts[0], Integer.parseInt(parts[1]));
            })
            .collect(Collectors.toSet());
        
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration(masterName, sentinelSet);
        sentinelConfig.setPassword(password);
        sentinelConfig.setSentinelPassword(password);
        
        // 连接池配置
        GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = 
            new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(20);
        poolConfig.setMaxIdle(10);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);
        
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .commandTimeout(Duration.ofSeconds(5))
            .shutdownTimeout(Duration.ofSeconds(10))
            .readFrom(ReadFrom.REPLICA_PREFERRED) // 读写分离
            .build();
        
        return new LettuceConnectionFactory(sentinelConfig, clientConfig);
    }
    
    /**
     * Redis模板
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(sentinelConnectionFactory());
        
        // 序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = 
            new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, 
            ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setDefaultSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        
        return template;
    }
    
    /**
     * Sentinel监听器 - 监控主节点切换
     */
    @Bean
    public RedisSentinelListener sentinelListener() {
        return new RedisSentinelListener();
    }
}

/**
 * Sentinel事件监听器
 */
@Component
public class RedisSentinelListener implements ApplicationListener<RedisSentinelFailoverEvent> {
    
    private static final Logger logger = LoggerFactory.getLogger(RedisSentinelListener.class);
    
    @Override
    public void onApplicationEvent(RedisSentinelFailoverEvent event) {
        logger.warn("Redis Sentinel故障转移事件: oldMaster={}, newMaster={}", 
            event.getOldMaster(), event.getNewMaster());
        
        // 发送告警通知
        sendFailoverAlert(event);
        
        // 记录故障转移日志
        recordFailoverLog(event);
        
        // 触发应用层的故障恢复逻辑
        handleFailoverRecovery(event);
    }
    
    private void sendFailoverAlert(RedisSentinelFailoverEvent event) {
        // 实现告警通知逻辑
        // 可以发送邮件、短信、钉钉通知等
        logger.info("发送Redis故障转移告警通知");
    }
    
    private void recordFailoverLog(RedisSentinelFailoverEvent event) {
        // 记录详细的故障转移日志
        logger.info("记录Redis故障转移日志到数据库");
    }
    
    private void handleFailoverRecovery(RedisSentinelFailoverEvent event) {
        // 执行故障恢复后的业务逻辑
        // 比如清理缓存、重新加载数据等
        logger.info("执行Redis故障转移后的恢复逻辑");
    }
}

/**
 * 高可用Redis服务
 */
@Service
public class HighAvailabilityRedisService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final Logger logger = LoggerFactory.getLogger(HighAvailabilityRedisService.class);
    
    /**
     * 容错的写操作
     */
    @Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
    public void setWithRetry(String key, Object value, Duration timeout) {
        try {
            redisTemplate.opsForValue().set(key, value, timeout);
            logger.debug("Redis写入成功: key={}", key);
        } catch (Exception e) {
            logger.error("Redis写入失败,将重试: key={}", key, e);
            throw e;
        }
    }
    
    /**
     * 容错的读操作
     */
    @Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 500))
    public Object getWithRetry(String key) {
        try {
            Object value = redisTemplate.opsForValue().get(key);
            logger.debug("Redis读取成功: key={}, found={}", key, value != null);
            return value;
        } catch (Exception e) {
            logger.error("Redis读取失败,将重试: key={}", key, e);
            throw e;
        }
    }
    
    /**
     * 批量操作的容错处理
     */
    public Map<String, Object> batchGetWithFallback(List<String> keys) {
        Map<String, Object> result = new HashMap<>();
        
        try {
            List<Object> values = redisTemplate.opsForValue().multiGet(keys);
            for (int i = 0; i < keys.size(); i++) {
                if (i < values.size() && values.get(i) != null) {
                    result.put(keys.get(i), values.get(i));
                }
            }
        } catch (Exception e) {
            logger.error("Redis批量读取失败,尝试单个读取", e);
            
            // 降级策略:逐个读取
            for (String key : keys) {
                try {
                    Object value = redisTemplate.opsForValue().get(key);
                    if (value != null) {
                        result.put(key, value);
                    }
                } catch (Exception singleException) {
                    logger.warn("单个key读取也失败: key={}", key);
                }
            }
        }
        
        return result;
    }
}

Redis Cluster集群实战

生产级Cluster集群配置

# application.yml - Cluster配置
spring:
  redis:
    cluster:
      nodes:
        - 192.168.1.101:7000
        - 192.168.1.101:7001
        - 192.168.1.102:7000
        - 192.168.1.102:7001
        - 192.168.1.103:7000
        - 192.168.1.103:7001
      max-redirects: 3
    password: ${REDIS_PASSWORD}
    timeout: 5000ms
    lettuce:
      pool:
        max-active: 50
        max-idle: 20
        min-idle: 10
        max-wait: 5000ms
      cluster:
        refresh:
          adaptive: true
          period: 30s
/**
 * Redis Cluster配置
 */
@Configuration
public class RedisClusterConfig {
    
    @Value("#{'${spring.redis.cluster.nodes}'.split(',')}")
    private List<String> clusterNodes;
    
    @Value("${spring.redis.cluster.max-redirects}")
    private int maxRedirects;
    
    @Value("${spring.redis.password}")
    private String password;
    
    /**
     * Cluster连接工厂
     */
    @Bean
    public LettuceConnectionFactory clusterConnectionFactory() {
        // 解析集群节点
        Set<RedisNode> nodeSet = clusterNodes.stream()
            .map(node -> {
                String[] parts = node.split(":");
                return new RedisNode(parts[0], Integer.parseInt(parts[1]));
            })
            .collect(Collectors.toSet());
        
        RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
        clusterConfig.setClusterNodes(nodeSet);
        clusterConfig.setMaxRedirects(maxRedirects);
        clusterConfig.setPassword(password);
        
        // 连接池配置
        GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = 
            new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(50);
        poolConfig.setMaxIdle(20);
        poolConfig.setMinIdle(10);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);
        
        // 客户端配置
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .commandTimeout(Duration.ofSeconds(5))
            .shutdownTimeout(Duration.ofSeconds(10))
            .readFrom(ReadFrom.REPLICA_PREFERRED) // 读写分离
            .build();
        
        return new LettuceConnectionFactory(clusterConfig, clientConfig);
    }
    
    /**
     * Cluster Redis模板
     */
    @Bean
    public RedisTemplate<String, Object> clusterRedisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(clusterConnectionFactory());
        
        // 序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = 
            new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, 
            ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.setDefaultSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        
        return template;
    }
    
    /**
     * 集群管理工具
     */
    @Bean
    public RedisClusterManager clusterManager() {
        return new RedisClusterManager(clusterConnectionFactory());
    }
}

/**
 * Redis Cluster管理器
 */
@Component
public class RedisClusterManager {
    
    private final LettuceConnectionFactory connectionFactory;
    private static final Logger logger = LoggerFactory.getLogger(RedisClusterManager.class);
    
    public RedisClusterManager(LettuceConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }
    
    /**
     * 获取集群信息
     */
    public ClusterInfo getClusterInfo() {
        try (StatefulRedisClusterConnection<String, String> connection = 
             (StatefulRedisClusterConnection<String, String>) connectionFactory.getConnection()) {
            
            RedisAdvancedClusterCommands<String, String> commands = connection.sync();
            
            // 获取集群节点信息
            String clusterNodes = commands.clusterNodes();
            Map<String, String> clusterInfo = commands.clusterInfo();
            
            return ClusterInfo.builder()
                .nodes(parseClusterNodes(clusterNodes))
                .info(clusterInfo)
                .state(clusterInfo.get("cluster_state"))
                .slotsAssigned(Integer.parseInt(clusterInfo.get("cluster_slots_assigned")))
                .slotsOk(Integer.parseInt(clusterInfo.get("cluster_slots_ok")))
                .slotsFail(Integer.parseInt(clusterInfo.get("cluster_slots_fail")))
                .knownNodes(Integer.parseInt(clusterInfo.get("cluster_known_nodes")))
                .build();
                
        } catch (Exception e) {
            logger.error("获取集群信息失败", e);
            throw new RuntimeException("获取集群信息失败", e);
        }
    }
    
    /**
     * 获取Key的槽位信息
     */
    public SlotInfo getKeySlot(String key) {
        int slot = SlotHash.getSlot(key);
        
        try (StatefulRedisClusterConnection<String, String> connection = 
             (StatefulRedisClusterConnection<String, String>) connectionFactory.getConnection()) {
            
            RedisAdvancedClusterCommands<String, String> commands = connection.sync();
            List<Object> slotInfo = commands.clusterSlots();
            
            // 查找槽位对应的节点
            for (Object slotRange : slotInfo) {
                List<Object> range = (List<Object>) slotRange;
                int startSlot = ((Number) range.get(0)).intValue();
                int endSlot = ((Number) range.get(1)).intValue();
                
                if (slot >= startSlot && slot <= endSlot) {
                    List<Object> masterInfo = (List<Object>) range.get(2);
                    String masterHost = new String((byte[]) masterInfo.get(0));
                    int masterPort = ((Number) masterInfo.get(1)).intValue();
                    
                    return SlotInfo.builder()
                        .key(key)
                        .slot(slot)
                        .masterHost(masterHost)
                        .masterPort(masterPort)
                        .slotRange(startSlot + "-" + endSlot)
                        .build();
                }
            }
            
        } catch (Exception e) {
            logger.error("获取Key槽位信息失败: key={}", key, e);
        }
        
        return SlotInfo.builder()
            .key(key)
            .slot(slot)
            .build();
    }
    
    /**
     * 健康检查
     */
    public Map<String, String> healthCheck() {
        Map<String, String> healthStatus = new HashMap<>();
        
        try {
            ClusterInfo clusterInfo = getClusterInfo();
            
            healthStatus.put("cluster_state", clusterInfo.getState());
            healthStatus.put("cluster_slots_assigned", String.valueOf(clusterInfo.getSlotsAssigned()));
            healthStatus.put("cluster_slots_ok", String.valueOf(clusterInfo.getSlotsOk()));
            healthStatus.put("cluster_slots_fail", String.valueOf(clusterInfo.getSlotsFail()));
            healthStatus.put("cluster_known_nodes", String.valueOf(clusterInfo.getKnownNodes()));
            
            // 检查是否所有槽位都正常
            if (clusterInfo.getSlotsAssigned() == 16384 && clusterInfo.getSlotsFail() == 0) {
                healthStatus.put("overall_status", "HEALTHY");
            } else {
                healthStatus.put("overall_status", "DEGRADED");
            }
            
        } catch (Exception e) {
            logger.error("集群健康检查失败", e);
            healthStatus.put("overall_status", "UNHEALTHY");
            healthStatus.put("error", e.getMessage());
        }
        
        return healthStatus;
    }
    
    private List<ClusterNode> parseClusterNodes(String clusterNodes) {
        // 解析集群节点信息的实现
        return Arrays.stream(clusterNodes.split("\n"))
            .filter(line -> !line.trim().isEmpty())
            .map(this::parseNodeLine)
            .collect(Collectors.toList());
    }
    
    private ClusterNode parseNodeLine(String nodeLine) {
        String[] parts = nodeLine.split(" ");
        return ClusterNode.builder()
            .nodeId(parts[0])
            .endpoint(parts[1])
            .flags(parts[2])
            .master(parts[3])
            .pingTime(Long.parseLong(parts[4]))
            .pongTime(Long.parseLong(parts[5]))
            .epoch(Long.parseLong(parts[6]))
            .state(parts[7])
            .slots(parts.length > 8 ? String.join(" ", Arrays.copyOfRange(parts, 8, parts.length)) : "")
            .build();
    }
}

// 数据模型类
@Data
@Builder
public class ClusterInfo {
    private List<ClusterNode> nodes;
    private Map<String, String> info;
    private String state;
    private int slotsAssigned;
    private int slotsOk;
    private int slotsFail;
    private int knownNodes;
}

@Data
@Builder
public class ClusterNode {
    private String nodeId;
    private String endpoint;
    private String flags;
    private String master;
    private long pingTime;
    private long pongTime;
    private long epoch;
    private String state;
    private String slots;
}

@Data
@Builder
public class SlotInfo {
    private String key;
    private int slot;
    private String masterHost;
    private int masterPort;
    private String slotRange;
}

🎯 面试高频问题精讲

问题1:Redis主从复制的工作原理是什么?数据一致性如何保证?

标准答案

Redis主从复制采用异步复制机制,包含三种同步方式:

1. 全量同步(Full Resynchronization)

  • 从节点首次连接主节点时触发
  • 主节点执行BGSAVE生成RDB快照
  • 将RDB文件发送给从节点
  • 从节点清空数据库并加载RDB
  • 主节点将期间的写命令发送给从节点

2. 增量同步(Incremental Sync)

  • 正常运行时的实时同步
  • 主节点将写命令实时发送给从节点
  • 基于复制积压缓冲区(replication backlog)

3. 部分重同步(Partial Resynchronization)

  • 从节点短暂断开后重连时触发
  • 基于复制偏移量(replication offset)判断
  • 只同步缺失的部分数据

数据一致性保证机制

// 复制偏移量机制
public class ReplicationOffset {
    private long masterOffset;  // 主节点写入的数据量
    private long slaveOffset;   // 从节点复制的数据量
    
    // 判断数据是否一致
    public boolean isConsistent() {
        return Math.abs(masterOffset - slaveOffset) <= ACCEPTABLE_LAG;
    }
}

// 复制积压缓冲区
public class ReplicationBacklog {
    private byte[] buffer;      // 环形缓冲区
    private long bufferSize;    // 缓冲区大小(默认1MB)
    private long masterOffset; // 主节点偏移量
    
    // 判断是否可以部分重同步
    public boolean canPartialSync(long slaveOffset) {
        return masterOffset - slaveOffset <= bufferSize;
    }
}

面试加分点

  • 提到repl-diskless-sync无盘复制优化
  • 了解min-slaves-to-write参数控制写入安全性
  • 知道复制超时和心跳检测机制

问题2:Redis Sentinel的脑裂问题如何解决?

标准答案

脑裂现象:网络分区导致出现多个主节点,数据不一致。

解决方案

1. 配置最小从节点数

# redis.conf配置
min-replicas-to-write 1          # 至少1个从节点在线才允许写入
min-replicas-max-lag 10          # 从节点最大延迟10秒

2. Sentinel Quorum机制

# sentinel.conf配置
sentinel monitor mymaster 192.168.1.100 6379 2  # 需要2个Sentinel同意才能故障转移

3. 客户端连接验证

@Service
public class AntiBrainSplitService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 防脑裂的写操作
     */
    public void safeWrite(String key, Object value) {
        try {
            // 1. 检查当前节点是否为主节点
            String role = redisTemplate.getConnectionFactory()
                .getConnection().info("replication").getProperty("role");
            
            if (!"master".equals(role)) {
                throw new RuntimeException("当前节点不是主节点,拒绝写入");
            }
            
            // 2. 检查从节点数量
            String connectedSlaves = redisTemplate.getConnectionFactory()
                .getConnection().info("replication").getProperty("connected_slaves");
            
            int slaveCount = Integer.parseInt(connectedSlaves);
            if (slaveCount < 1) {
                throw new RuntimeException("从节点数量不足,拒绝写入");
            }
            
            // 3. 执行写操作
            redisTemplate.opsForValue().set(key, value);
            
        } catch (Exception e) {
            logger.error("防脑裂写操作失败", e);
            throw e;
        }
    }
    
    /**
     * 检查数据一致性
     */
    public boolean checkConsistency(String key) {
        try {
            // 从主节点读取
            Object masterValue = redisTemplate.opsForValue().get(key);
            
            // 从从节点读取(需要配置从节点连接)
            Object slaveValue = slaveRedisTemplate.opsForValue().get(key);
            
            return Objects.equals(masterValue, slaveValue);
            
        } catch (Exception e) {
            logger.error("一致性检查失败", e);
            return false;
        }
    }
}

面试加分点

  • 了解网络分区检测机制
  • 知道客户端重连策略
  • 提到监控和告警的重要性

问题3:Redis Cluster如何处理槽位迁移?数据迁移过程中如何保证服务可用?

标准答案

槽位迁移流程

1. 迁移准备阶段

# 1. 设置目标节点为导入状态
CLUSTER SETSLOT 1000 IMPORTING target-node-id

# 2. 设置源节点为导出状态  
CLUSTER SETSLOT 1000 MIGRATING source-node-id

2. 数据迁移阶段

# 3. 逐个迁移槽位中的Key
MIGRATE target-host target-port key destination-db timeout

# 4. 检查槽位是否为空
CLUSTER COUNTKEYSINSLOT 1000

3. 迁移完成阶段

# 5. 更新槽位归属
CLUSTER SETSLOT 1000 NODE target-node-id

# 6. 广播槽位变更
CLUSTER NODES

服务可用性保证机制

/**
 * 槽位迁移期间的客户端处理
 */
@Service
public class ClusterMigrationHandler {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 迁移期间的安全读写
     */
    public Object safeGet(String key) {
        int maxRetries = 3;
        int retryCount = 0;
        
        while (retryCount < maxRetries) {
            try {
                return redisTemplate.opsForValue().get(key);
                
            } catch (RedisRedirectionException e) {
                // 处理MOVED重定向
                if (e.getMessage().contains("MOVED")) {
                    logger.info("槽位已迁移,重定向到新节点: {}", e.getMessage());
                    // 客户端会自动重定向,重试即可
                    retryCount++;
                    continue;
                }
                
                // 处理ASK重定向
                if (e.getMessage().contains("ASK")) {
                    logger.info("槽位正在迁移,临时重定向: {}", e.getMessage());
                    // 发送ASKING命令后重试
                    handleAskRedirection(e);
                    retryCount++;
                    continue;
                }
                
                throw e;
                
            } catch (Exception e) {
                logger.error("读取失败,重试: key={}, attempt={}", key, retryCount + 1);
                retryCount++;
                
                if (retryCount >= maxRetries) {
                    throw new RuntimeException("读取失败,已达最大重试次数", e);
                }
                
                // 等待后重试
                try {
                    Thread.sleep(100 * retryCount);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("重试被中断", ie);
                }
            }
        }
        
        throw new RuntimeException("读取失败,已达最大重试次数");
    }
    
    /**
     * 处理ASK重定向
     */
    private void handleAskRedirection(RedisRedirectionException e) {
        // 解析ASK响应,获取目标节点信息
        String askMessage = e.getMessage();
        // 格式: ASK 1000 192.168.1.101:7001
        String[] parts = askMessage.split(" ");
        String targetNode = parts[2];
        
        // 向目标节点发送ASKING命令
        try {
            // 这里需要建立到目标节点的连接并发送ASKING
            logger.info("发送ASKING命令到目标节点: {}", targetNode);
        } catch (Exception ex) {
            logger.error("发送ASKING命令失败", ex);
        }
    }
    
    /**
     * 迁移期间的写操作
     */
    public void safeSet(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            
        } catch (RedisRedirectionException e) {
            if (e.getMessage().contains("MOVED")) {
                // 槽位已完全迁移,重试写入
                logger.info("槽位已迁移,重新写入: {}", e.getMessage());
                redisTemplate.opsForValue().set(key, value);
            } else {
                throw new RuntimeException("写操作失败,槽位迁移中", e);
            }
        }
    }
}

/**
 * 迁移状态监控
 */
@Component
public class MigrationMonitor {
    
    /**
     * 监控迁移进度
     */
    @Scheduled(fixedDelay = 5000)
    public void monitorMigration() {
        try {
            // 检查集群状态
            Map<String, String> clusterInfo = getClusterInfo();
            
            String state = clusterInfo.get("cluster_state");
            if ("fail".equals(state)) {
                logger.error("集群状态异常,可能存在迁移问题");
                // 发送告警
                sendAlert("集群状态异常");
            }
            
            // 检查迁移中的槽位
            int migratingSlots = Integer.parseInt(
                clusterInfo.getOrDefault("cluster_slots_migrating", "0"));
            int importingSlots = Integer.parseInt(
                clusterInfo.getOrDefault("cluster_slots_importing", "0"));
            
            if (migratingSlots > 0 || importingSlots > 0) {
                logger.info("检测到槽位迁移: migrating={}, importing={}", 
                    migratingSlots, importingSlots);
            }
            
        } catch (Exception e) {
            logger.error("迁移监控失败", e);
        }
    }
    
    private Map<String, String> getClusterInfo() {
        // 获取集群信息的实现
        return new HashMap<>();
    }
    
    private void sendAlert(String message) {
        // 发送告警的实现
        logger.warn("发送告警: {}", message);
    }
}

面试加分点

  • 了解MOVED和ASK重定向的区别
  • 知道槽位迁移的原子性保证
  • 提到客户端缓存刷新机制

问题4:Redis集群扩容和缩容的最佳实践是什么?

标准答案

扩容最佳实践

1. 添加节点

# 1. 启动新节点
redis-server /path/to/redis-7003.conf

# 2. 将新节点加入集群
redis-cli --cluster add-node 192.168.1.101:7003 192.168.1.101:7000

# 3. 分配槽位给新节点
redis-cli --cluster reshard 192.168.1.101:7000

2. 自动化扩容脚本

/**
 * 集群自动扩容管理
 */
@Service
public class ClusterScalingService {
    
    /**
     * 自动扩容
     */
    public void autoScale() {
        // 1. 检查集群负载
        ClusterMetrics metrics = getClusterMetrics();
        
        if (shouldScaleOut(metrics)) {
            logger.info("集群负载过高,开始自动扩容");
            scaleOut();
        } else if (shouldScaleIn(metrics)) {
            logger.info("集群负载较低,开始自动缩容");
            scaleIn();
        }
    }
    
    /**
     * 扩容判断条件
     */
    private boolean shouldScaleOut(ClusterMetrics metrics) {
        return metrics.getCpuUsage() > 80 ||           // CPU使用率超过80%
               metrics.getMemoryUsage() > 85 ||        // 内存使用率超过85%
               metrics.getConnectionCount() > 8000 ||  // 连接数超过8000
               metrics.getQps() > 50000;               // QPS超过5万
    }
    
    /**
     * 缩容判断条件
     */
    private boolean shouldScaleIn(ClusterMetrics metrics) {
        return metrics.getCpuUsage() < 30 &&           // CPU使用率低于30%
               metrics.getMemoryUsage() < 40 &&        // 内存使用率低于40%
               metrics.getConnectionCount() < 2000 &&  // 连接数低于2000
               metrics.getQps() < 10000 &&             // QPS低于1万
               getCurrentNodeCount() > 3;              // 节点数大于最小值3
    }
    
    /**
     * 执行扩容
     */
    private void scaleOut() {
        try {
            // 1. 创建新节点实例
            String newNodeId = createNewNode();
            
            // 2. 将新节点加入集群
            addNodeToCluster(newNodeId);
            
            // 3. 重新分配槽位
            reshardSlots(newNodeId);
            
            // 4. 验证扩容结果
            validateScaleOut(newNodeId);
            
            logger.info("集群扩容完成: newNode={}", newNodeId);
            
        } catch (Exception e) {
            logger.error("集群扩容失败", e);
            // 回滚操作
            rollbackScaleOut();
        }
    }
    
    /**
     * 执行缩容
     */
    private void scaleIn() {
        try {
            // 1. 选择要移除的节点
            String nodeToRemove = selectNodeToRemove();
            
            // 2. 迁移槽位到其他节点
            migrateSlots(nodeToRemove);
            
            // 3. 从集群中移除节点
            removeNodeFromCluster(nodeToRemove);
            
            // 4. 销毁节点实例
            destroyNode(nodeToRemove);
            
            logger.info("集群缩容完成: removedNode={}", nodeToRemove);
            
        } catch (Exception e) {
            logger.error("集群缩容失败", e);
            // 回滚操作
            rollbackScaleIn();
        }
    }
    
    /**
     * 平滑重分片
     */
    private void reshardSlots(String targetNodeId) {
        try {
            // 计算每个节点应该分配的槽位数
            int totalNodes = getCurrentNodeCount();
            int slotsPerNode = 16384 / totalNodes;
            
            // 从现有节点迁移部分槽位到新节点
            List<String> sourceNodes = getExistingNodes();
            int slotsToMigrate = slotsPerNode;
            
            for (String sourceNode : sourceNodes) {
                if (slotsToMigrate <= 0) break;
                
                List<Integer> sourceSlots = getNodeSlots(sourceNode);
                int slotsFromThisNode = Math.min(slotsToMigrate, sourceSlots.size() / totalNodes);
                
                for (int i = 0; i < slotsFromThisNode; i++) {
                    migrateSlot(sourceSlots.get(i), sourceNode, targetNodeId);
                    slotsToMigrate--;
                }
            }
            
        } catch (Exception e) {
            logger.error("重分片失败", e);
            throw e;
        }
    }
    
    /**
     * 渐进式槽位迁移
     */
    private void migrateSlot(int slot, String sourceNode, String targetNode) {
        try {
            // 1. 设置迁移状态
            setSlotMigrating(slot, sourceNode, targetNode);
            
            // 2. 逐个迁移Key
            List<String> keys = getSlotKeys(slot, sourceNode);
            int batchSize = 100; // 每批迁移100个Key
            
            for (int i = 0; i < keys.size(); i += batchSize) {
                List<String> batch = keys.subList(i, Math.min(i + batchSize, keys.size()));
                migrateBatch(batch, sourceNode, targetNode);
                
                // 避免阻塞,每批次之间暂停
                Thread.sleep(10);
            }
            
            // 3. 完成槽位迁移
            completeSlotMigration(slot, targetNode);
            
        } catch (Exception e) {
            logger.error("槽位迁移失败: slot={}", slot, e);
            throw e;
        }
    }
}

/**
 * 集群监控指标
 */
@Data
public class ClusterMetrics {
    private double cpuUsage;           // CPU使用率
    private double memoryUsage;        // 内存使用率
    private int connectionCount;       // 连接数
    private long qps;                  // 每秒查询数
    private long networkIn;            // 网络入流量
    private long networkOut;           // 网络出流量
    private Map<String, NodeMetrics> nodeMetrics; // 各节点指标
}

@Data
public class NodeMetrics {
    private String nodeId;
    private String endpoint;
    private double cpuUsage;
    private double memoryUsage;
    private int slotCount;
    private long keyCount;
    private boolean isMaster;
}

面试加分点

  • 了解蓝绿部署和滚动升级策略
  • 知道数据预热和缓存穿透预防
  • 提到监控和自动化运维的重要性

问题5:如何设计Redis高可用架构来应对不同的故障场景?

标准答案

多层次高可用架构设计

/**
 * 高可用架构设计
 */
@Configuration
public class HighAvailabilityArchitecture {
    
    /**
     * 多级缓存架构
     */
    @Bean
    public MultiLevelCacheManager multiLevelCache() {
        return MultiLevelCacheManager.builder()
            .l1Cache(caffeineCache())           // 本地缓存
            .l2Cache(redisClusterCache())       // Redis集群缓存
            .l3Cache(redisSentinelCache())      // Redis哨兵缓存
            .l4Cache(databaseCache())           // 数据库缓存
            .build();
    }
    
    /**
     * 故障转移策略
     */
    @Bean
    public FailoverStrategy failoverStrategy() {
        return FailoverStrategy.builder()
            .primaryDataSource(redisCluster())
            .secondaryDataSource(redisSentinel())
            .fallbackDataSource(database())
            .circuitBreakerConfig(circuitBreakerConfig())
            .retryConfig(retryConfig())
            .build();
    }
}

/**
 * 高可用缓存服务
 */
@Service
public class HighAvailabilityCacheService {
    
    @Autowired
    private MultiLevelCacheManager cacheManager;
    
    @Autowired
    private FailoverStrategy failoverStrategy;
    
    private static final Logger logger = LoggerFactory.getLogger(HighAvailabilityCacheService.class);
    
    /**
     * 高可用读取
     */
    public <T> T get(String key, Class<T> type) {
        // L1: 本地缓存
        T value = cacheManager.getL1Cache().get(key, type);
        if (value != null) {
            logger.debug("L1缓存命中: key={}", key);
            return value;
        }
        
        // L2: Redis集群
        try {
            value = cacheManager.getL2Cache().get(key, type);
            if (value != null) {
                logger.debug("L2缓存命中: key={}", key);
                // 回填L1缓存
                cacheManager.getL1Cache().put(key, value);
                return value;
            }
        } catch (Exception e) {
            logger.warn("L2缓存访问失败,降级到L3: key={}", key, e);
        }
        
        // L3: Redis哨兵
        try {
            value = cacheManager.getL3Cache().get(key, type);
            if (value != null) {
                logger.debug("L3缓存命中: key={}", key);
                // 回填上级缓存
                backfillUpperCache(key, value);
                return value;
            }
        } catch (Exception e) {
            logger.warn("L3缓存访问失败,降级到数据库: key={}", key, e);
        }
        
        // L4: 数据库
        try {
            value = cacheManager.getL4Cache().get(key, type);
            if (value != null) {
                logger.debug("数据库查询成功: key={}", key);
                // 回填所有级别缓存
                backfillAllCache(key, value);
                return value;
            }
        } catch (Exception e) {
            logger.error("所有数据源访问失败: key={}", key, e);
            throw new RuntimeException("数据访问失败", e);
        }
        
        return null;
    }
    
    /**
     * 高可用写入
     */
    public void put(String key, Object value, Duration ttl) {
        List<Exception> exceptions = new ArrayList<>();
        
        // 尝试写入所有可用的缓存层
        try {
            cacheManager.getL2Cache().put(key, value, ttl);
            logger.debug("L2缓存写入成功: key={}", key);
        } catch (Exception e) {
            logger.warn("L2缓存写入失败: key={}", key, e);
            exceptions.add(e);
        }
        
        try {
            cacheManager.getL3Cache().put(key, value, ttl);
            logger.debug("L3缓存写入成功: key={}", key);
        } catch (Exception e) {
            logger.warn("L3缓存写入失败: key={}", key, e);
            exceptions.add(e);
        }
        
        // 更新本地缓存
        cacheManager.getL1Cache().put(key, value, ttl);
        
        // 如果所有远程缓存都失败,抛出异常
        if (exceptions.size() >= 2) {
            throw new RuntimeException("缓存写入失败", exceptions.get(0));
        }
    }
    
    /**
     * 故障恢复处理
     */
    @EventListener
    public void handleFailoverEvent(FailoverEvent event) {
        logger.info("处理故障转移事件: {}", event);
        
        switch (event.getType()) {
            case REDIS_CLUSTER_DOWN:
                handleRedisClusterFailure();
                break;
            case REDIS_SENTINEL_DOWN:
                handleRedisSentinelFailure();
                break;
            case DATABASE_DOWN:
                handleDatabaseFailure();
                break;
            default:
                logger.warn("未知的故障类型: {}", event.getType());
        }
    }
    
    private void handleRedisClusterFailure() {
        // 1. 切换到哨兵模式
        failoverStrategy.switchToSecondary();
        
        // 2. 增加本地缓存容量
        cacheManager.expandL1Cache();
        
        // 3. 启用写操作缓冲
        enableWriteBuffer();
        
        // 4. 发送告警通知
        sendAlert("Redis集群故障,已切换到哨兵模式");
    }
    
    private void handleRedisSentinelFailure() {
        // 1. 完全依赖本地缓存和数据库
        failoverStrategy.switchToFallback();
        
        // 2. 启用读写分离
        enableReadWriteSeparation();
        
        // 3. 增加数据库连接池
        expandDatabasePool();
        
        // 4. 发送告警通知
        sendAlert("Redis完全故障,已切换到数据库模式");
    }
    
    /**
     * 健康检查和自动恢复
     */
    @Scheduled(fixedDelay = 30000)
    public void healthCheck() {
        // 检查各级缓存健康状态
        Map<String, Boolean> healthStatus = new HashMap<>();
        
        healthStatus.put("L1_CACHE", checkL1CacheHealth());
        healthStatus.put("L2_CACHE", checkL2CacheHealth());
        healthStatus.put("L3_CACHE", checkL3CacheHealth());
        healthStatus.put("DATABASE", checkDatabaseHealth());
        
        // 自动恢复
        if (healthStatus.get("L2_CACHE") && !failoverStrategy.isL2Active()) {
            logger.info("检测到L2缓存恢复,开始自动切回");
            autoRecovery();
        }
        
        // 记录健康状态
        logger.debug("健康检查结果: {}", healthStatus);
    }
    
    private void autoRecovery() {
        try {
            // 1. 预热缓存
            warmupCache();
            
            // 2. 切换回主要数据源
            failoverStrategy.switchToPrimary();
            
            // 3. 恢复正常配置
            restoreNormalConfig();
            
            logger.info("自动恢复完成");
            
        } catch (Exception e) {
            logger.error("自动恢复失败", e);
        }
    }
}

面试加分点

  • 了解CAP理论在Redis中的应用
  • 知道数据一致性和可用性的权衡
  • 提到容灾备份和异地多活方案

⚡ 性能优化与注意事项

主从复制性能优化

1. 复制缓冲区优化

# redis.conf优化配置
repl-backlog-size 64mb              # 增大复制积压缓冲区
repl-backlog-ttl 3600              # 缓冲区保留时间1小时
repl-timeout 60                    # 复制超时时间
repl-ping-slave-period 10          # 心跳间隔

2. 网络优化

# 启用TCP_NODELAY,减少延迟
repl-disable-tcp-nodelay no

# 无盘复制优化(适用于网络比磁盘快的场景)
repl-diskless-sync yes
repl-diskless-sync-delay 5

3. 从节点优化

@Configuration
public class SlaveOptimizationConfig {
    
    /**
     * 从节点连接池优化
     */
    @Bean
    public LettuceConnectionFactory optimizedSlaveFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName(slaveHost);
        config.setPort(slavePort);
        
        // 从节点连接池配置
        GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = 
            new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(100);        // 增加最大连接数
        poolConfig.setMaxIdle(50);          // 增加最大空闲连接
        poolConfig.setMinIdle(20);          // 保持最小连接数
        poolConfig.setTestOnBorrow(false);  // 关闭借用时测试,提高性能
        poolConfig.setTestWhileIdle(true);  // 空闲时测试连接
        poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 30秒检查一次
        
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .readFrom(ReadFrom.REPLICA)     // 强制从从节点读取
            .commandTimeout(Duration.ofSeconds(2))  // 减少命令超时时间
            .build();
        
        return new LettuceConnectionFactory(config, clientConfig);
    }
}

Sentinel性能优化

1. Sentinel配置优化

# sentinel.conf性能优化
sentinel down-after-milliseconds mymaster 5000    # 减少故障检测时间
sentinel failover-timeout mymaster 60000          # 减少故障转移超时
sentinel parallel-syncs mymaster 2                # 增加并行同步数量
sentinel deny-scripts-reconfig yes                # 禁用脚本重配置提高安全性

2. 客户端连接优化

@Service
public class OptimizedSentinelService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private final LoadingCache<String, Object> localCache = Caffeine.newBuilder()
        .maximumSize(10000)
        .expireAfterWrite(Duration.ofMinutes(5))
        .recordStats()
        .build(key -> {
            // 缓存未命中时从Redis加载
            return redisTemplate.opsForValue().get(key);
        });
    
    /**
     * 优化的读取操作
     */
    public Object optimizedGet(String key) {
        try {
            // 1. 先检查本地缓存
            Object value = localCache.getIfPresent(key);
            if (value != null) {
                return value;
            }
            
            // 2. 从Redis读取
            value = redisTemplate.opsForValue().get(key);
            if (value != null) {
                // 3. 更新本地缓存
                localCache.put(key, value);
            }
            
            return value;
            
        } catch (Exception e) {
            // 4. 故障时返回本地缓存
            Object cachedValue = localCache.getIfPresent(key);
            if (cachedValue != null) {
                logger.warn("Redis访问失败,返回本地缓存: key={}", key);
                return cachedValue;
            }
            throw e;
        }
    }
    
    /**
     * 批量预加载
     */
    @Async
    public void preloadCache(List<String> keys) {
        try {
            List<Object> values = redisTemplate.opsForValue().multiGet(keys);
            for (int i = 0; i < keys.size(); i++) {
                if (values.get(i) != null) {
                    localCache.put(keys.get(i), values.get(i));
                }
            }
            logger.info("预加载缓存完成: count={}", keys.size());
        } catch (Exception e) {
            logger.error("预加载缓存失败", e);
        }
    }
}

Cluster性能优化

1. 槽位分布优化

/**
 * 集群槽位优化管理
 */
@Service
public class ClusterSlotOptimizer {
    
    /**
     * 智能槽位重分配
     */
    public void optimizeSlotDistribution() {
        try {
            // 1. 收集各节点负载信息
            Map<String, NodeLoad> nodeLoads = collectNodeLoads();
            
            // 2. 计算最优分配方案
            Map<String, List<Integer>> optimalDistribution = calculateOptimalDistribution(nodeLoads);
            
            // 3. 执行渐进式重分配
            executeGradualRebalancing(optimalDistribution);
            
        } catch (Exception e) {
            logger.error("槽位优化失败", e);
        }
    }
    
    /**
     * 热点Key检测和优化
     */
    @Scheduled(fixedDelay = 300000) // 5分钟检查一次
    public void detectAndOptimizeHotKeys() {
        try {
            // 1. 检测热点Key
            Map<String, Long> hotKeys = detectHotKeys();
            
            // 2. 分析热点分布
            Map<Integer, List<String>> hotKeysBySlot = groupHotKeysBySlot(hotKeys);
            
            // 3. 优化热点槽位
            for (Map.Entry<Integer, List<String>> entry : hotKeysBySlot.entrySet()) {
                int slot = entry.getKey();
                List<String> keys = entry.getValue();
                
                if (keys.size() > 100) { // 热点阈值
                    optimizeHotSlot(slot, keys);
                }
            }
            
        } catch (Exception e) {
            logger.error("热点Key优化失败", e);
        }
    }
    
    private void optimizeHotSlot(int slot, List<String> hotKeys) {
        // 1. 为热点Key创建本地缓存
        for (String key : hotKeys) {
            String value = getFromRedis(key);
            if (value != null) {
                localCache.put(key, value, Duration.ofMinutes(10));
            }
        }
        
        // 2. 考虑将热点槽位迁移到性能更好的节点
        String currentNode = getSlotNode(slot);
        String betterNode = findBetterNodeForHotSlot(slot);
        
        if (betterNode != null && !betterNode.equals(currentNode)) {
            logger.info("计划迁移热点槽位: slot={}, from={}, to={}", slot, currentNode, betterNode);
            // 这里可以标记为待迁移,在低峰期执行
            scheduleSlotMigration(slot, currentNode, betterNode);
        }
    }
}

2. 连接池优化

@Configuration
public class ClusterConnectionOptimization {
    
    @Bean
    public LettuceConnectionFactory optimizedClusterFactory() {
        RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(clusterNodes);
        clusterConfig.setMaxRedirects(2); // 减少重定向次数
        
        // 优化的连接池配置
        GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = 
            new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(200);        // 增加总连接数
        poolConfig.setMaxIdle(100);         // 增加最大空闲连接
        poolConfig.setMinIdle(50);          // 保持足够的最小连接
        poolConfig.setTestOnBorrow(false);  // 关闭借用测试
        poolConfig.setTestOnReturn(false);  // 关闭归还测试
        poolConfig.setTestWhileIdle(true);  // 只在空闲时测试
        poolConfig.setBlockWhenExhausted(false); // 连接耗尽时不阻塞
        
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .commandTimeout(Duration.ofSeconds(3))
            .shutdownTimeout(Duration.ofSeconds(5))
            .readFrom(ReadFrom.REPLICA_PREFERRED)
            .build();
        
        return new LettuceConnectionFactory(clusterConfig, clientConfig);
    }
}

重要注意事项

1. 数据一致性权衡

/**
 * 一致性级别配置
 */
@Component
public class ConsistencyManager {
    
    public enum ConsistencyLevel {
        EVENTUAL,    // 最终一致性
        STRONG,      // 强一致性
        WEAK         // 弱一致性
    }
    
    /**
     * 根据业务场景选择一致性级别
     */
    public void writeWithConsistency(String key, Object value, ConsistencyLevel level) {
        switch (level) {
            case STRONG:
                // 同步写入主节点,等待从节点确认
                writeToMasterAndWaitSlaves(key, value);
                break;
            case EVENTUAL:
                // 只写入主节点,异步复制到从节点
                writeToMasterOnly(key, value);
                break;
            case WEAK:
                // 写入任意可用节点
                writeToAnyAvailableNode(key, value);
                break;
        }
    }
}

2. 容量规划建议

/**
 * 容量规划工具
 */
@Component
public class CapacityPlanner {
    
    /**
     * 计算集群容量需求
     */
    public ClusterCapacityPlan calculateCapacity(BusinessRequirement requirement) {
        // 1. 数据量估算
        long totalDataSize = requirement.getKeyCount() * requirement.getAvgValueSize();
        
        // 2. 内存需求(考虑复制和开销)
        long memoryRequired = totalDataSize * 2 * 1.2; // 主从复制 + 20%开销
        
        // 3. QPS需求分析
        long readQps = requirement.getReadQps();
        long writeQps = requirement.getWriteQps();
        
        // 4. 节点数量计算
        int nodeCount = Math.max(
            (int) Math.ceil(memoryRequired / MAX_NODE_MEMORY),
            (int) Math.ceil((readQps + writeQps) / MAX_NODE_QPS)
        );
        
        // 5. 确保最小节点数
        nodeCount = Math.max(nodeCount, 3);
        
        return ClusterCapacityPlan.builder()
            .nodeCount(nodeCount)
            .memoryPerNode(memoryRequired / nodeCount)
            .expectedQpsPerNode((readQps + writeQps) / nodeCount)
            .replicationFactor(2)
            .build();
    }
}

3. 监控告警配置

# 监控指标阈值配置
monitoring:
  redis:
    cluster:
      # 节点健康监控
      node-down-threshold: 1          # 节点下线告警阈值
      memory-usage-threshold: 85      # 内存使用率告警阈值
      cpu-usage-threshold: 80         # CPU使用率告警阈值
      connection-threshold: 8000      # 连接数告警阈值
      
      # 性能监控
      response-time-threshold: 100    # 响应时间告警阈值(毫秒)
      qps-threshold: 50000           # QPS告警阈值
      error-rate-threshold: 1        # 错误率告警阈值(%)
      
      # 复制监控
      replication-lag-threshold: 10   # 复制延迟告警阈值(秒)
      backlog-usage-threshold: 80     # 积压缓冲区使用率阈值
      
      # 集群监控
      slot-migration-timeout: 300     # 槽位迁移超时告警(秒)
      cluster-state-check-interval: 30 # 集群状态检查间隔(秒)

4. 常见问题避免

  • 避免大Key:单个Key不超过10MB,使用Hash分片
  • 避免热点Key:使用本地缓存和多级缓存
  • 避免阻塞操作:使用SCAN替代KEYS,避免长时间的SORT操作
  • 合理设置过期时间:避免大量Key同时过期
  • 监控慢查询:设置slowlog监控,优化慢查询
  • 网络优化:使用pipeline批量操作,减少网络往返
  • 内存优化:合理配置maxmemory和淘汰策略

📚 总结与技术对比

Redis高可用方案对比

特性主从复制Sentinel哨兵Cluster集群
部署复杂度简单中等复杂
故障转移手动自动自动
数据分片不支持不支持支持
扩展性垂直扩展垂直扩展水平扩展
一致性最终一致性最终一致性最终一致性
客户端复杂度中等
运维成本中等
适用场景小规模、读多写少中等规模、高可用要求大规模、高并发

方案选择建议

1. 主从复制适用场景

// 适用条件评估
public class MasterSlaveScenario {
    public boolean isApplicable(SystemRequirement req) {
        return req.getDataSize() < 50_000_000_000L &&      // 数据量 < 50GB
               req.getWriteQps() < 10_000 &&               // 写QPS < 1万
               req.getReadQps() < 50_000 &&                // 读QPS < 5万
               req.getDowntimeTolerance() > 300 &&         // 可容忍5分钟故障
               req.getTeamSize() < 5;                      // 运维团队 < 5人
    }
}

2. Sentinel哨兵适用场景

// 适用条件评估
public class SentinelScenario {
    public boolean isApplicable(SystemRequirement req) {
        return req.getDataSize() < 200_000_000_000L &&     // 数据量 < 200GB
               req.getWriteQps() < 50_000 &&               // 写QPS < 5万
               req.getReadQps() < 200_000 &&               // 读QPS < 20万
               req.getDowntimeTolerance() < 60 &&          // 故障恢复 < 1分钟
               req.isHighAvailabilityRequired();           // 需要高可用
    }
}

3. Cluster集群适用场景

// 适用条件评估
public class ClusterScenario {
    public boolean isApplicable(SystemRequirement req) {
        return req.getDataSize() > 100_000_000_000L ||     // 数据量 > 100GB
               req.getWriteQps() > 30_000 ||               // 写QPS > 3万
               req.getReadQps() > 100_000 ||               // 读QPS > 10万
               req.isHorizontalScalingRequired() ||        // 需要水平扩展
               req.getGrowthRate() > 0.5;                  // 年增长率 > 50%
    }
}

架构演进路径

1. 渐进式演进策略

/**
 * Redis架构演进管理器
 */
@Service
public class ArchitectureEvolutionManager {
    
    /**
     * 评估当前架构是否需要升级
     */
    public EvolutionPlan evaluateEvolution(CurrentArchitecture current, SystemMetrics metrics) {
        EvolutionPlan plan = new EvolutionPlan();
        
        // 1. 评估性能瓶颈
        if (isPerformanceBottleneck(metrics)) {
            if (current.getType() == ArchitectureType.MASTER_SLAVE) {
                plan.recommendUpgrade(ArchitectureType.SENTINEL, "性能瓶颈,建议升级到Sentinel");
            } else if (current.getType() == ArchitectureType.SENTINEL) {
                plan.recommendUpgrade(ArchitectureType.CLUSTER, "性能瓶颈,建议升级到Cluster");
            }
        }
        
        // 2. 评估可用性需求
        if (isAvailabilityInsufficient(current, metrics)) {
            if (current.getType() == ArchitectureType.MASTER_SLAVE) {
                plan.recommendUpgrade(ArchitectureType.SENTINEL, "可用性不足,需要自动故障转移");
            }
        }
        
        // 3. 评估扩展性需求
        if (isScalabilityInsufficient(current, metrics)) {
            plan.recommendUpgrade(ArchitectureType.CLUSTER, "扩展性不足,需要水平扩展");
        }
        
        return plan;
    }
    
    /**
     * 执行架构升级
     */
    public void executeEvolution(EvolutionPlan plan) {
        try {
            switch (plan.getTargetArchitecture()) {
                case SENTINEL:
                    upgradeToSentinel(plan);
                    break;
                case CLUSTER:
                    upgradeToCluster(plan);
                    break;
                default:
                    throw new UnsupportedOperationException("不支持的架构类型");
            }
        } catch (Exception e) {
            logger.error("架构升级失败", e);
            rollbackEvolution(plan);
        }
    }
    
    private void upgradeToSentinel(EvolutionPlan plan) {
        // 1. 部署Sentinel节点
        deploySentinelNodes();
        
        // 2. 配置主从监控
        configureMasterSlaveMonitoring();
        
        // 3. 切换客户端连接
        switchClientToSentinel();
        
        // 4. 验证升级结果
        validateSentinelDeployment();
    }
    
    private void upgradeToCluster(EvolutionPlan plan) {
        // 1. 创建集群节点
        createClusterNodes();
        
        // 2. 初始化集群
        initializeCluster();
        
        // 3. 数据迁移
        migrateDataToCluster();
        
        // 4. 切换客户端
        switchClientToCluster();
        
        // 5. 验证集群状态
        validateClusterDeployment();
    }
}

2. 零停机升级策略

/**
 * 零停机升级实现
 */
@Service
public class ZeroDowntimeUpgradeService {
    
    /**
     * 主从到Sentinel零停机升级
     */
    public void upgradeToSentinelZeroDowntime() {
        try {
            // 1. 在现有主从基础上部署Sentinel
            List<String> sentinelNodes = deploySentinelNodesParallel();
            
            // 2. 配置Sentinel监控现有主从
            configureSentinelMonitoring(sentinelNodes);
            
            // 3. 验证Sentinel正常工作
            validateSentinelOperations();
            
            // 4. 逐步切换客户端连接
            gradualClientMigration();
            
            // 5. 清理旧配置
            cleanupOldConfiguration();
            
            logger.info("零停机升级到Sentinel完成");
            
        } catch (Exception e) {
            logger.error("零停机升级失败,开始回滚", e);
            rollbackToMasterSlave();
        }
    }
    
    /**
     * Sentinel到Cluster零停机升级
     */
    public void upgradeToClusterZeroDowntime() {
        try {
            // 1. 创建新的集群环境
            ClusterEnvironment newCluster = createNewClusterEnvironment();
            
            // 2. 实时数据同步
            startRealTimeDataSync(newCluster);
            
            // 3. 验证数据一致性
            validateDataConsistency();
            
            // 4. 切换读流量到新集群
            switchReadTrafficToCluster(newCluster);
            
            // 5. 切换写流量到新集群
            switchWriteTrafficToCluster(newCluster);
            
            // 6. 停止旧环境
            shutdownOldEnvironment();
            
            logger.info("零停机升级到Cluster完成");
            
        } catch (Exception e) {
            logger.error("零停机升级失败,开始回滚", e);
            rollbackToSentinel();
        }
    }
    
    /**
     * 渐进式客户端迁移
     */
    private void gradualClientMigration() {
        List<String> clientGroups = getClientGroups();
        
        for (String group : clientGroups) {
            try {
                // 1. 切换客户端组到新架构
                switchClientGroup(group);
                
                // 2. 监控切换后的性能
                monitorPerformanceAfterSwitch(group);
                
                // 3. 等待一段时间确保稳定
                Thread.sleep(60000); // 等待1分钟
                
                logger.info("客户端组切换完成: {}", group);
                
            } catch (Exception e) {
                logger.error("客户端组切换失败: {}", group, e);
                // 回滚该客户端组
                rollbackClientGroup(group);
                throw e;
            }
        }
    }
}

最佳实践总结

1. 架构设计原则

  • 渐进式演进:从简单到复杂,根据业务需求逐步升级
  • 监控先行:完善的监控体系是高可用的基础
  • 自动化运维:减少人工干预,提高运维效率
  • 容量规划:提前规划容量,避免临时扩容
  • 故障演练:定期进行故障演练,验证高可用方案

2. 运维建议

  • 文档完善:维护详细的架构文档和运维手册
  • 版本管理:统一Redis版本,便于维护升级
  • 配置管理:使用配置管理工具统一管理配置
  • 备份策略:制定完善的数据备份和恢复策略
  • 安全加固:配置访问控制、网络隔离等安全措施

3. 开发建议

  • 客户端优化:合理配置连接池和超时参数
  • 数据模型设计:避免大Key和热点Key
  • 缓存策略:设计合理的缓存更新和失效策略
  • 降级方案:准备缓存降级和熔断机制
  • 性能测试:定期进行性能压测,发现潜在问题

Redis集群架构与高可用方案是构建大规模分布式系统的重要基础设施。通过合理选择架构方案、优化配置参数、完善监控体系,可以构建出稳定可靠的Redis高可用服务,为业务系统提供强有力的缓存支撑。在实际应用中,需要根据具体的业务场景和技术要求,选择最适合的高可用方案,并制定完善的运维策略。

Comments

Link copied to clipboard!