Redis集群架构与高可用方案深度解析
深入探讨Redis Cluster分布式架构、Redis Sentinel哨兵模式和主从复制机制。掌握高可用部署策略、故障转移原理和集群扩容技术,构建生产级Redis高可用方案。
🤔 问题背景与技术演进
我们要解决什么问题?
单机Redis在生产环境中面临多重挑战:
- 单点故障风险:Redis服务器宕机导致整个缓存系统不可用
- 内存容量限制:单机内存有限,无法存储海量数据
- 性能瓶颈:单机QPS有上限,无法支撑高并发访问
- 数据安全性:内存数据易丢失,需要可靠的备份机制
- 扩展性差:业务增长时难以动态扩容
// 单机Redis的问题示例
public class SingleRedisProblems {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void demonstrateSinglePointFailure() {
try {
// 大量业务依赖Redis
String userSession = (String) redisTemplate.opsForValue().get("session:user123");
List<String> shoppingCart = redisTemplate.opsForList().range("cart:user123", 0, -1);
String userPreferences = (String) redisTemplate.opsForValue().get("prefs:user123");
// 如果Redis服务器突然宕机...
// 所有依赖Redis的功能都会失效:
// - 用户需要重新登录
// - 购物车数据丢失
// - 个性化推荐失效
// - 系统响应时间急剧增加
} catch (Exception e) {
log.error("Redis连接失败,系统功能受影响", e);
// 业务降级处理,但用户体验严重受损
}
}
public void demonstrateCapacityLimitation() {
/*
* 内存容量问题:
* - 单机内存:128GB
* - 业务数据:500GB
* - 结果:无法全部缓存,缓存命中率低
*
* 性能瓶颈:
* - 单机QPS:10万
* - 业务需求:50万QPS
* - 结果:响应时间增加,用户体验差
*/
}
}
没有这个技术时是怎么做的?
在Redis集群技术出现之前,开发者主要通过以下方式解决高可用问题:
1. 主从复制 + 手动切换
# 手动主从切换
redis-cli -h slave-server SLAVEOF NO ONE
# 应用程序手动切换连接地址
- 问题:需要人工干预,切换时间长,容易出错
2. 客户端分片
// 客户端一致性哈希分片
public class ClientSharding {
private List<JedisPool> redisPools;
public Jedis getRedisConnection(String key) {
int hash = key.hashCode();
int index = Math.abs(hash) % redisPools.size();
return redisPools.get(index).getResource();
}
}
- 问题:扩容困难,数据迁移复杂,客户端逻辑复杂
3. 代理层方案
- 使用Twemproxy、Codis等代理
- 问题:增加网络延迟,代理成为新的单点
4. 应用层多写
// 多个Redis实例同时写入
public void writeToMultipleRedis(String key, String value) {
redis1.set(key, value);
redis2.set(key, value);
redis3.set(key, value);
}
- 问题:数据一致性难以保证,性能开销大
技术演进的历史脉络
2010年: Redis 2.0引入主从复制
- 支持一主多从架构
- 数据自动同步到从节点
- 手动故障切换
2012年: Redis 2.6引入Sentinel哨兵
- 自动故障检测和切换
- 主从切换自动化
- 配置管理和通知
2015年: Redis 3.0引入Cluster集群
- 原生分布式支持
- 自动数据分片(16384个slot)
- 无中心化架构
2016年: Redis 3.2集群功能完善
- 集群重平衡优化
- 故障转移改进
- 管理工具完善
2018年: Redis 5.0集群增强
- Stream数据类型集群支持
- 集群管理命令改进
- 性能优化
2020年至今: 云原生集群
- Kubernetes集群部署
- 容器化高可用方案
- 多云架构支持
🎯 核心概念与原理
Redis主从复制机制
主从复制是Redis高可用的基础,实现数据备份和读写分离:
/**
* Redis主从复制架构分析
*/
public class RedisMasterSlaveAnalysis {
/**
* 主从复制原理
*/
public void analyzeMasterSlaveReplication() {
/*
* 复制过程:
* 1. 从节点连接主节点
* 2. 从节点发送PSYNC命令请求同步
* 3. 主节点执行BGSAVE生成RDB快照
* 4. 主节点将RDB文件发送给从节点
* 5. 从节点加载RDB文件恢复数据
* 6. 主节点将缓冲区的写命令发送给从节点
* 7. 后续写命令实时同步
*
* 同步类型:
* - 全量同步:首次连接或长时间断开后重连
* - 增量同步:正常运行时的实时同步
* - 部分同步:短暂断开后的快速恢复
*/
}
/**
* 主从复制配置
*/
public void configureMasterSlave() {
/*
* 主节点配置(redis-master.conf):
* bind 0.0.0.0
* port 6379
* requirepass master_password
*
* # 主从复制安全
* masterauth master_password
*
* # 复制相关配置
* repl-diskless-sync no # 使用磁盘复制
* repl-diskless-sync-delay 5 # 延迟5秒开始复制
*
* 从节点配置(redis-slave.conf):
* bind 0.0.0.0
* port 6380
*
* # 指定主节点
* replicaof 192.168.1.100 6379
* masterauth master_password
*
* # 从节点只读
* replica-read-only yes
*
* # 复制超时配置
* repl-timeout 60
*/
}
/**
* 读写分离实现
*/
public void implementReadWriteSeparation() {
/*
* Spring Boot配置读写分离:
*/
}
}
@Configuration
public class RedisReadWriteConfig {
@Bean
@Primary
public LettuceConnectionFactory masterConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("redis-master.example.com");
config.setPort(6379);
config.setPassword("master_password");
return new LettuceConnectionFactory(config);
}
@Bean
public LettuceConnectionFactory slaveConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("redis-slave.example.com");
config.setPort(6380);
config.setPassword("master_password");
return new LettuceConnectionFactory(config);
}
@Bean
public RedisTemplate<String, Object> masterRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(masterConnectionFactory());
// 序列化配置...
return template;
}
@Bean
public RedisTemplate<String, Object> slaveRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(slaveConnectionFactory());
// 序列化配置...
return template;
}
}
Redis Sentinel哨兵模式
Sentinel提供自动故障检测和切换功能:
/**
* Redis Sentinel架构分析
*/
public class RedisSentinelAnalysis {
/**
* Sentinel工作原理
*/
public void analyzeSentinelMechanism() {
/*
* Sentinel功能:
* 1. 监控:持续监控主从节点健康状态
* 2. 通知:向管理员发送故障通知
* 3. 故障转移:自动将从节点提升为主节点
* 4. 配置管理:为客户端提供当前主节点信息
*
* 工作流程:
* 1. Sentinel定期向主从节点发送PING命令
* 2. 如果主节点在指定时间内未响应,标记为主观下线
* 3. 询问其他Sentinel,确认主节点客观下线
* 4. 选举Leader Sentinel执行故障转移
* 5. 选择合适的从节点提升为新主节点
* 6. 更新其他从节点的复制目标
* 7. 通知客户端新的主节点信息
*/
}
/**
* Sentinel配置
*/
public void configureSentinel() {
/*
* sentinel.conf配置:
*
* # Sentinel端口
* port 26379
*
* # 监控主节点
* sentinel monitor mymaster 192.168.1.100 6379 2
*
* # 认证密码
* sentinel auth-pass mymaster master_password
*
* # 主观下线时间(毫秒)
* sentinel down-after-milliseconds mymaster 30000
*
* # 故障转移超时时间
* sentinel failover-timeout mymaster 180000
*
* # 同时进行复制的从节点数量
* sentinel parallel-syncs mymaster 1
*
* # 通知脚本
* sentinel notification-script mymaster /var/redis/notify.sh
*
* # 客户端重新配置脚本
* sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
*/
}
}
Redis Cluster集群架构
Redis Cluster提供原生的分布式解决方案:
/**
* Redis Cluster架构分析
*/
public class RedisClusterAnalysis {
/**
* Cluster分片原理
*/
public void analyzeClusterSharding() {
/*
* 数据分片机制:
* 1. 16384个哈希槽(Hash Slot)
* 2. 每个Key通过CRC16算法计算槽位
* 3. 每个节点负责一部分槽位
* 4. 客户端根据槽位路由请求
*
* 槽位计算:
* slot = CRC16(key) % 16384
*
* 集群拓扑:
* - 最少3个主节点
* - 每个主节点可配置多个从节点
* - 节点间通过Gossip协议通信
* - 无中心化架构,任意节点可接收请求
*/
}
/**
* 故障转移机制
*/
public void analyzeFailoverMechanism() {
/*
* 故障检测:
* 1. 节点间定期发送PING消息
* 2. 超时未响应标记为PFAIL(可能失败)
* 3. 多数节点确认后标记为FAIL(确认失败)
*
* 自动故障转移:
* 1. 从节点检测到主节点失败
* 2. 从节点发起选举成为新主节点
* 3. 新主节点接管原主节点的槽位
* 4. 更新集群配置并广播
*
* 选举算法:
* - 基于Raft算法的简化版本
* - 复制偏移量大的从节点优先
* - 节点ID小的从节点优先(tie-breaker)
*/
}
/**
* 集群扩容机制
*/
public void analyzeClusterScaling() {
/*
* 扩容流程:
* 1. 添加新节点到集群
* 2. 分配槽位给新节点
* 3. 迁移数据到新节点
* 4. 更新集群拓扑
*
* 数据迁移:
* - 逐个槽位迁移
* - 源节点标记槽位为迁移中
* - 逐个Key迁移到目标节点
* - 迁移完成后更新槽位归属
*
* 缩容流程:
* 1. 将节点的槽位迁移到其他节点
* 2. 确认节点无槽位后移除
* 3. 更新集群配置
*/
}
}
🔧 实现原理与源码分析
主从复制源码实现
主从复制的核心实现位于replication.c
文件中:
// Redis主从复制核心代码分析
/**
* PSYNC命令实现 - 部分重同步
*/
void syncCommand(client *c) {
/* PSYNC命令格式: PSYNC <repl_id> <offset> */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (c->argc != 3) {
addReplyError(c, "Wrong number of arguments for PSYNC");
return;
}
// 获取复制ID和偏移量
char *master_replid = c->argv[1]->ptr;
char *offset = c->argv[2]->ptr;
long long psync_offset;
if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK)
return;
// 判断是否可以进行部分重同步
if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {
server.stat_sync_partial_ok++;
return; /* 部分重同步成功 */
} else {
server.stat_sync_full++;
/* 执行全量重同步 */
}
}
// 全量重同步流程
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
// 如果没有正在进行的BGSAVE,启动新的BGSAVE
if (server.rdb_child_pid == -1) {
if (startBgsaveForReplication(c->slave_req) != C_OK) {
addReplyError(c, "Unable to perform background save");
return;
}
}
// 将客户端添加到等待列表
listAddNodeTail(server.slaves, c);
}
/**
* 主节点向从节点发送RDB文件
*/
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
listIter li;
listRewind(server.slaves, &li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct stat buf;
if (bgsaveerr != C_OK) {
freeClient(slave);
continue;
}
// 打开RDB文件准备发送
if ((slave->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 ||
fstat(slave->repldbfd, &buf) == -1) {
freeClient(slave);
continue;
}
// 记录RDB文件大小和发送进度
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),
"$%lld\r\n", (unsigned long long) slave->repldbsize);
// 注册写事件,准备发送RDB数据
aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave);
}
}
}
/**
* 发送RDB数据到从节点
*/
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
client *slave = privdata;
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;
// 先发送RDB文件大小信息
if (slave->replpreamble) {
nwritten = connWrite(slave->conn, slave->replpreamble, sdslen(slave->replpreamble));
if (nwritten == -1) {
freeClient(slave);
return;
}
slave->replpreamble = sdsrange(slave->replpreamble, nwritten, -1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
} else {
return;
}
}
// 发送RDB文件内容
lseek(slave->repldbfd, slave->repldboff, SEEK_SET);
buflen = read(slave->repldbfd, buf, PROTO_IOBUF_LEN);
if (buflen <= 0) {
if (buflen == 0) {
// RDB文件发送完成
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el, slave->fd, AE_WRITABLE);
slave->replstate = SLAVE_STATE_ONLINE;
// 发送缓冲区中的写命令
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
freeClient(slave);
return;
}
slave->repldboff = 0;
} else {
freeClient(slave);
return;
}
return;
}
// 写入网络缓冲区
if ((nwritten = connWrite(slave->conn, buf, buflen)) == -1) {
freeClient(slave);
return;
}
slave->repldboff += nwritten;
// 更新网络统计
server.stat_net_output_bytes += nwritten;
}
/**
* 实时同步写命令到从节点
*/
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
// 如果没有从节点,直接返回
if (listLength(slaves) == 0) return;
// 构造Redis协议格式的命令
robj **outv;
int outc = 0, j;
// 如果需要切换数据库
if (server.slaveseldb != dictid) {
robj *selectcmd;
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len = ll2string(llstr, sizeof(llstr), dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
// 发送SELECT命令到所有从节点
listRewind(slaves, &li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReply(slave, selectcmd);
}
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
// 发送实际的写命令
listRewind(slaves, &li);
while((ln = listNext(&li))) {
client *slave = ln->value;
// 只向在线的从节点发送命令
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
// 添加命令到从节点的输出缓冲区
addReplyArrayLen(slave, argc);
for (j = 0; j < argc; j++) {
addReplyBulk(slave, argv[j]);
}
}
}
Sentinel哨兵源码实现
Sentinel的核心实现位于sentinel.c
文件中:
// Redis Sentinel核心代码分析
/**
* Sentinel主循环 - 监控和故障检测
*/
void sentinelTimer(void) {
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
// 处理重新配置状态
sentinelHandleRedisInstance(sentinel.masters);
}
/**
* 检查Redis实例状态
*/
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* 发送PING命令检查实例状态 */
sentinelSendPeriodicCommands(ri);
/* 检查主观下线状态 */
sentinelCheckSubjectivelyDown(ri);
/* 检查客观下线状态(仅对主节点) */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
/* 如果主节点客观下线,开始故障转移 */
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_ASK_FORCED);
/* 执行故障转移步骤 */
sentinelFailoverStateMachine(ri);
/* 向其他Sentinel询问主节点状态 */
sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_NO_FLAGS);
}
}
/**
* 检查主观下线状态
*/
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
if (ri->link->last_pong_time)
elapsed = mstime() - ri->link->last_pong_time;
else if (ri->link->last_ping_time)
elapsed = mstime() - ri->link->last_ping_time;
/* 如果超过down-after-milliseconds时间没有响应 */
if (elapsed > ri->down_after_period) {
/* 标记为主观下线 */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING, "+sdown", ri, "%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* 恢复正常状态 */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING, "-sdown", ri, "%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
/**
* 检查客观下线状态
*/
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
/* 主节点必须先主观下线 */
if (master->flags & SRI_S_DOWN) {
quorum = 1; /* 当前Sentinel认为下线 */
/* 统计其他Sentinel的投票 */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
/* 如果达到quorum数量,标记为客观下线 */
if (quorum >= master->quorum) odown = 1;
}
/* 更新客观下线状态 */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING, "+odown", master, "%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING, "-odown", master, "%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
/**
* 故障转移状态机
*/
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
/**
* 选择最佳从节点进行提升
*/
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0]) * dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
/* 收集可用的从节点 */
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
/* 跳过下线的从节点 */
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
/* 跳过断开连接的从节点 */
if (slave->link->disconnected) continue;
/* 跳过复制延迟过大的从节点 */
if (mstime() - slave->slave_repl_offset_time > SENTINEL_INFO_PERIOD*5)
continue;
/* 跳过优先级为0的从节点(不参与选举) */
if (slave->slave_priority == 0) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
/* 根据优先级、复制偏移量、运行ID排序 */
if (instances) {
qsort(instance, instances, sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}
Cluster集群源码实现
Redis Cluster的核心实现位于cluster.c
文件中:
// Redis Cluster核心代码分析
/**
* 集群槽位分配和路由
*/
int clusterProcessCommand(client *c) {
/* 如果不是集群模式,正常处理命令 */
if (!(server.cluster_enabled)) return 0;
/* 多键命令的槽位检查 */
if (server.cluster->state != CLUSTER_OK) {
if (!(c->cmd->flags & CMD_ASKING)) {
addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
return 1;
}
}
/* 计算命令涉及的槽位 */
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->flags & CMD_ASKING) {
clusterRedirectClient(c, n, hashslot, CLUSTER_REDIR_ASK);
} else {
clusterRedirectClient(c, n, hashslot, CLUSTER_REDIR_MOVED);
}
return 1;
}
/* 在当前节点执行命令 */
return 0;
}
/**
* 根据Key计算槽位
*/
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
/* 查找哈希标签 {...} */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* 没有找到 { ,使用整个key */
if (s == keylen) return crc16(key, keylen) & 0x3FFF;
/* 查找匹配的 } */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* 没有找到 } 或者 {} 为空,使用整个key */
if (e == keylen || e == s+1) return crc16(key, keylen) & 0x3FFF;
/* 使用 {tag} 中的内容计算槽位 */
return crc16(key+s+1, e-s-1) & 0x3FFF;
}
/**
* 集群故障检测
*/
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* Masters with no working slaves. */
int max_slaves; /* Max number of slaves for a single master. */
int this_slaves; /* Number of slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
/* 遍历所有已知节点 */
di = dictGetIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
/* 跳过自己和握手中的节点 */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) continue;
/* 检查节点是否超时 */
if (!(node->flags & CLUSTER_NODE_NOADDR) &&
node->pong_received &&
(now - node->pong_received) > server.cluster_node_timeout)
{
/* 标记节点为可能失败 */
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
dictReleaseIterator(di);
/* 检查是否有节点被标记为FAIL */
di = dictGetIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
int needed_quorum = (server.cluster->size / 2) + 1;
if (node->flags & CLUSTER_NODE_PFAIL) {
int failure_reports = clusterNodeFailureReportsCount(node);
/* 如果故障报告数量达到quorum,标记为FAIL */
if (failure_reports >= needed_quorum) {
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
/* 广播FAIL消息 */
clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
update_state = 1;
}
}
}
dictReleaseIterator(di);
/* 如果我们是从节点,检查是否需要进行故障转移 */
if (nodeIsSlave(server.cluster->myself) &&
server.cluster->myself->slaveof &&
nodeTimedOut(server.cluster->myself->slaveof))
{
clusterHandleSlaveFailover();
}
/* 更新集群状态 */
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
/**
* 从节点故障转移处理
*/
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
/* 计算数据年龄 */
data_age = (server.unixtime - server.cluster->myself->slaveof->ping_sent) * 1000;
/* 如果数据太旧,不进行故障转移 */
if (data_age > server.cluster_node_timeout * CLUSTER_SLAVE_VALIDITY_MULT)
return;
/* 如果还没有请求投票权限,发起投票 */
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
server.cluster->failover_auth_sent = 1;
server.cluster->failover_auth_time = mstime() +
500 + random() % 500; /* 添加随机延迟避免冲突 */
/* 向所有主节点请求投票 */
clusterRequestFailoverAuth();
return;
}
/* 检查是否获得足够的投票 */
if (server.cluster->failover_auth_count >= needed_quorum) {
/* 获得足够投票,开始故障转移 */
clusterFailoverReplaceYourMaster();
}
}
💡 实战案例与代码示例
主从复制实战部署
完整的主从复制环境搭建:
/**
* Redis主从复制完整实现
*/
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisReplicationConfig {
@Value("${redis.master.host:localhost}")
private String masterHost;
@Value("${redis.master.port:6379}")
private int masterPort;
@Value("${redis.slave.host:localhost}")
private String slaveHost;
@Value("${redis.slave.port:6380}")
private int slavePort;
@Value("${redis.password}")
private String password;
/**
* 主节点连接工厂
*/
@Bean
@Primary
public LettuceConnectionFactory masterConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName(masterHost);
config.setPort(masterPort);
config.setPassword(password);
// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20);
poolConfig.setMaxIdle(10);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5))
.shutdownTimeout(Duration.ofSeconds(10))
.build();
return new LettuceConnectionFactory(config, clientConfig);
}
/**
* 从节点连接工厂(只读)
*/
@Bean
public LettuceConnectionFactory slaveConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName(slaveHost);
config.setPort(slavePort);
config.setPassword(password);
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(50); // 读操作较多,增加连接数
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(10);
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.readFrom(ReadFrom.REPLICA_PREFERRED) // 优先从从节点读取
.commandTimeout(Duration.ofSeconds(3))
.build();
return new LettuceConnectionFactory(config, clientConfig);
}
/**
* 主节点Redis模板(写操作)
*/
@Bean
public RedisTemplate<String, Object> masterRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(masterConnectionFactory());
// 序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setDefaultSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* 从节点Redis模板(读操作)
*/
@Bean
public RedisTemplate<String, Object> slaveRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(slaveConnectionFactory());
// 使用相同的序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setDefaultSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
/**
* 读写分离服务实现
*/
@Service
public class RedisReadWriteService {
@Autowired
@Qualifier("masterRedisTemplate")
private RedisTemplate<String, Object> masterRedisTemplate;
@Autowired
@Qualifier("slaveRedisTemplate")
private RedisTemplate<String, Object> slaveRedisTemplate;
private static final Logger logger = LoggerFactory.getLogger(RedisReadWriteService.class);
/**
* 写操作 - 使用主节点
*/
public void writeData(String key, Object value) {
try {
masterRedisTemplate.opsForValue().set(key, value, Duration.ofHours(1));
logger.info("数据写入主节点成功: key={}", key);
} catch (Exception e) {
logger.error("数据写入主节点失败: key={}", key, e);
throw new RuntimeException("Redis写入失败", e);
}
}
/**
* 写操作 - 批量写入
*/
public void writeBatchData(Map<String, Object> dataMap) {
try {
masterRedisTemplate.opsForValue().multiSet(dataMap);
// 设置过期时间
dataMap.keySet().forEach(key ->
masterRedisTemplate.expire(key, Duration.ofHours(1))
);
logger.info("批量数据写入主节点成功: count={}", dataMap.size());
} catch (Exception e) {
logger.error("批量数据写入主节点失败", e);
throw new RuntimeException("Redis批量写入失败", e);
}
}
/**
* 读操作 - 使用从节点
*/
public Object readData(String key) {
try {
Object value = slaveRedisTemplate.opsForValue().get(key);
logger.debug("从从节点读取数据: key={}, found={}", key, value != null);
return value;
} catch (Exception e) {
logger.warn("从从节点读取失败,尝试从主节点读取: key={}", key);
// 从节点读取失败,降级到主节点
try {
Object value = masterRedisTemplate.opsForValue().get(key);
logger.info("从主节点读取数据成功: key={}", key);
return value;
} catch (Exception masterException) {
logger.error("从主节点读取也失败: key={}", key, masterException);
throw new RuntimeException("Redis读取失败", masterException);
}
}
}
/**
* 读操作 - 批量读取
*/
public List<Object> readBatchData(List<String> keys) {
try {
List<Object> values = slaveRedisTemplate.opsForValue().multiGet(keys);
logger.debug("从从节点批量读取数据: count={}", keys.size());
return values;
} catch (Exception e) {
logger.warn("从从节点批量读取失败,尝试从主节点读取");
try {
List<Object> values = masterRedisTemplate.opsForValue().multiGet(keys);
logger.info("从主节点批量读取数据成功: count={}", keys.size());
return values;
} catch (Exception masterException) {
logger.error("从主节点批量读取也失败", masterException);
throw new RuntimeException("Redis批量读取失败", masterException);
}
}
}
/**
* 删除操作 - 使用主节点
*/
public void deleteData(String key) {
try {
Boolean deleted = masterRedisTemplate.delete(key);
logger.info("从主节点删除数据: key={}, deleted={}", key, deleted);
} catch (Exception e) {
logger.error("从主节点删除数据失败: key={}", key, e);
throw new RuntimeException("Redis删除失败", e);
}
}
/**
* 健康检查
*/
public Map<String, String> healthCheck() {
Map<String, String> status = new HashMap<>();
// 检查主节点
try {
String masterPing = masterRedisTemplate.getConnectionFactory()
.getConnection().ping();
status.put("master", "PONG".equals(masterPing) ? "UP" : "DOWN");
} catch (Exception e) {
status.put("master", "DOWN");
logger.error("主节点健康检查失败", e);
}
// 检查从节点
try {
String slavePing = slaveRedisTemplate.getConnectionFactory()
.getConnection().ping();
status.put("slave", "PONG".equals(slavePing) ? "UP" : "DOWN");
} catch (Exception e) {
status.put("slave", "DOWN");
logger.error("从节点健康检查失败", e);
}
return status;
}
}
Sentinel哨兵模式实战
生产级Sentinel集群配置:
# application.yml - Sentinel配置
spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.1.101:26379
- 192.168.1.102:26379
- 192.168.1.103:26379
password: ${REDIS_PASSWORD}
password: ${REDIS_PASSWORD}
timeout: 5000ms
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5
max-wait: 5000ms
shutdown-timeout: 10000ms
/**
* Redis Sentinel配置
*/
@Configuration
public class RedisSentinelConfig {
@Value("${spring.redis.sentinel.master}")
private String masterName;
@Value("${spring.redis.password}")
private String password;
@Value("#{'${spring.redis.sentinel.nodes}'.split(',')}")
private List<String> sentinelNodes;
/**
* Sentinel连接工厂
*/
@Bean
public LettuceConnectionFactory sentinelConnectionFactory() {
// 解析Sentinel节点
Set<RedisNode> sentinelSet = sentinelNodes.stream()
.map(node -> {
String[] parts = node.split(":");
return new RedisNode(parts[0], Integer.parseInt(parts[1]));
})
.collect(Collectors.toSet());
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration(masterName, sentinelSet);
sentinelConfig.setPassword(password);
sentinelConfig.setSentinelPassword(password);
// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20);
poolConfig.setMaxIdle(10);
poolConfig.setMinIdle(5);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5))
.shutdownTimeout(Duration.ofSeconds(10))
.readFrom(ReadFrom.REPLICA_PREFERRED) // 读写分离
.build();
return new LettuceConnectionFactory(sentinelConfig, clientConfig);
}
/**
* Redis模板
*/
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(sentinelConnectionFactory());
// 序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setDefaultSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* Sentinel监听器 - 监控主节点切换
*/
@Bean
public RedisSentinelListener sentinelListener() {
return new RedisSentinelListener();
}
}
/**
* Sentinel事件监听器
*/
@Component
public class RedisSentinelListener implements ApplicationListener<RedisSentinelFailoverEvent> {
private static final Logger logger = LoggerFactory.getLogger(RedisSentinelListener.class);
@Override
public void onApplicationEvent(RedisSentinelFailoverEvent event) {
logger.warn("Redis Sentinel故障转移事件: oldMaster={}, newMaster={}",
event.getOldMaster(), event.getNewMaster());
// 发送告警通知
sendFailoverAlert(event);
// 记录故障转移日志
recordFailoverLog(event);
// 触发应用层的故障恢复逻辑
handleFailoverRecovery(event);
}
private void sendFailoverAlert(RedisSentinelFailoverEvent event) {
// 实现告警通知逻辑
// 可以发送邮件、短信、钉钉通知等
logger.info("发送Redis故障转移告警通知");
}
private void recordFailoverLog(RedisSentinelFailoverEvent event) {
// 记录详细的故障转移日志
logger.info("记录Redis故障转移日志到数据库");
}
private void handleFailoverRecovery(RedisSentinelFailoverEvent event) {
// 执行故障恢复后的业务逻辑
// 比如清理缓存、重新加载数据等
logger.info("执行Redis故障转移后的恢复逻辑");
}
}
/**
* 高可用Redis服务
*/
@Service
public class HighAvailabilityRedisService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final Logger logger = LoggerFactory.getLogger(HighAvailabilityRedisService.class);
/**
* 容错的写操作
*/
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public void setWithRetry(String key, Object value, Duration timeout) {
try {
redisTemplate.opsForValue().set(key, value, timeout);
logger.debug("Redis写入成功: key={}", key);
} catch (Exception e) {
logger.error("Redis写入失败,将重试: key={}", key, e);
throw e;
}
}
/**
* 容错的读操作
*/
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 500))
public Object getWithRetry(String key) {
try {
Object value = redisTemplate.opsForValue().get(key);
logger.debug("Redis读取成功: key={}, found={}", key, value != null);
return value;
} catch (Exception e) {
logger.error("Redis读取失败,将重试: key={}", key, e);
throw e;
}
}
/**
* 批量操作的容错处理
*/
public Map<String, Object> batchGetWithFallback(List<String> keys) {
Map<String, Object> result = new HashMap<>();
try {
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
for (int i = 0; i < keys.size(); i++) {
if (i < values.size() && values.get(i) != null) {
result.put(keys.get(i), values.get(i));
}
}
} catch (Exception e) {
logger.error("Redis批量读取失败,尝试单个读取", e);
// 降级策略:逐个读取
for (String key : keys) {
try {
Object value = redisTemplate.opsForValue().get(key);
if (value != null) {
result.put(key, value);
}
} catch (Exception singleException) {
logger.warn("单个key读取也失败: key={}", key);
}
}
}
return result;
}
}
Redis Cluster集群实战
生产级Cluster集群配置:
# application.yml - Cluster配置
spring:
redis:
cluster:
nodes:
- 192.168.1.101:7000
- 192.168.1.101:7001
- 192.168.1.102:7000
- 192.168.1.102:7001
- 192.168.1.103:7000
- 192.168.1.103:7001
max-redirects: 3
password: ${REDIS_PASSWORD}
timeout: 5000ms
lettuce:
pool:
max-active: 50
max-idle: 20
min-idle: 10
max-wait: 5000ms
cluster:
refresh:
adaptive: true
period: 30s
/**
* Redis Cluster配置
*/
@Configuration
public class RedisClusterConfig {
@Value("#{'${spring.redis.cluster.nodes}'.split(',')}")
private List<String> clusterNodes;
@Value("${spring.redis.cluster.max-redirects}")
private int maxRedirects;
@Value("${spring.redis.password}")
private String password;
/**
* Cluster连接工厂
*/
@Bean
public LettuceConnectionFactory clusterConnectionFactory() {
// 解析集群节点
Set<RedisNode> nodeSet = clusterNodes.stream()
.map(node -> {
String[] parts = node.split(":");
return new RedisNode(parts[0], Integer.parseInt(parts[1]));
})
.collect(Collectors.toSet());
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.setClusterNodes(nodeSet);
clusterConfig.setMaxRedirects(maxRedirects);
clusterConfig.setPassword(password);
// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(50);
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(10);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
// 客户端配置
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(5))
.shutdownTimeout(Duration.ofSeconds(10))
.readFrom(ReadFrom.REPLICA_PREFERRED) // 读写分离
.build();
return new LettuceConnectionFactory(clusterConfig, clientConfig);
}
/**
* Cluster Redis模板
*/
@Bean
public RedisTemplate<String, Object> clusterRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(clusterConnectionFactory());
// 序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setDefaultSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* 集群管理工具
*/
@Bean
public RedisClusterManager clusterManager() {
return new RedisClusterManager(clusterConnectionFactory());
}
}
/**
* Redis Cluster管理器
*/
@Component
public class RedisClusterManager {
private final LettuceConnectionFactory connectionFactory;
private static final Logger logger = LoggerFactory.getLogger(RedisClusterManager.class);
public RedisClusterManager(LettuceConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* 获取集群信息
*/
public ClusterInfo getClusterInfo() {
try (StatefulRedisClusterConnection<String, String> connection =
(StatefulRedisClusterConnection<String, String>) connectionFactory.getConnection()) {
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
// 获取集群节点信息
String clusterNodes = commands.clusterNodes();
Map<String, String> clusterInfo = commands.clusterInfo();
return ClusterInfo.builder()
.nodes(parseClusterNodes(clusterNodes))
.info(clusterInfo)
.state(clusterInfo.get("cluster_state"))
.slotsAssigned(Integer.parseInt(clusterInfo.get("cluster_slots_assigned")))
.slotsOk(Integer.parseInt(clusterInfo.get("cluster_slots_ok")))
.slotsFail(Integer.parseInt(clusterInfo.get("cluster_slots_fail")))
.knownNodes(Integer.parseInt(clusterInfo.get("cluster_known_nodes")))
.build();
} catch (Exception e) {
logger.error("获取集群信息失败", e);
throw new RuntimeException("获取集群信息失败", e);
}
}
/**
* 获取Key的槽位信息
*/
public SlotInfo getKeySlot(String key) {
int slot = SlotHash.getSlot(key);
try (StatefulRedisClusterConnection<String, String> connection =
(StatefulRedisClusterConnection<String, String>) connectionFactory.getConnection()) {
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
List<Object> slotInfo = commands.clusterSlots();
// 查找槽位对应的节点
for (Object slotRange : slotInfo) {
List<Object> range = (List<Object>) slotRange;
int startSlot = ((Number) range.get(0)).intValue();
int endSlot = ((Number) range.get(1)).intValue();
if (slot >= startSlot && slot <= endSlot) {
List<Object> masterInfo = (List<Object>) range.get(2);
String masterHost = new String((byte[]) masterInfo.get(0));
int masterPort = ((Number) masterInfo.get(1)).intValue();
return SlotInfo.builder()
.key(key)
.slot(slot)
.masterHost(masterHost)
.masterPort(masterPort)
.slotRange(startSlot + "-" + endSlot)
.build();
}
}
} catch (Exception e) {
logger.error("获取Key槽位信息失败: key={}", key, e);
}
return SlotInfo.builder()
.key(key)
.slot(slot)
.build();
}
/**
* 健康检查
*/
public Map<String, String> healthCheck() {
Map<String, String> healthStatus = new HashMap<>();
try {
ClusterInfo clusterInfo = getClusterInfo();
healthStatus.put("cluster_state", clusterInfo.getState());
healthStatus.put("cluster_slots_assigned", String.valueOf(clusterInfo.getSlotsAssigned()));
healthStatus.put("cluster_slots_ok", String.valueOf(clusterInfo.getSlotsOk()));
healthStatus.put("cluster_slots_fail", String.valueOf(clusterInfo.getSlotsFail()));
healthStatus.put("cluster_known_nodes", String.valueOf(clusterInfo.getKnownNodes()));
// 检查是否所有槽位都正常
if (clusterInfo.getSlotsAssigned() == 16384 && clusterInfo.getSlotsFail() == 0) {
healthStatus.put("overall_status", "HEALTHY");
} else {
healthStatus.put("overall_status", "DEGRADED");
}
} catch (Exception e) {
logger.error("集群健康检查失败", e);
healthStatus.put("overall_status", "UNHEALTHY");
healthStatus.put("error", e.getMessage());
}
return healthStatus;
}
private List<ClusterNode> parseClusterNodes(String clusterNodes) {
// 解析集群节点信息的实现
return Arrays.stream(clusterNodes.split("\n"))
.filter(line -> !line.trim().isEmpty())
.map(this::parseNodeLine)
.collect(Collectors.toList());
}
private ClusterNode parseNodeLine(String nodeLine) {
String[] parts = nodeLine.split(" ");
return ClusterNode.builder()
.nodeId(parts[0])
.endpoint(parts[1])
.flags(parts[2])
.master(parts[3])
.pingTime(Long.parseLong(parts[4]))
.pongTime(Long.parseLong(parts[5]))
.epoch(Long.parseLong(parts[6]))
.state(parts[7])
.slots(parts.length > 8 ? String.join(" ", Arrays.copyOfRange(parts, 8, parts.length)) : "")
.build();
}
}
// 数据模型类
@Data
@Builder
public class ClusterInfo {
private List<ClusterNode> nodes;
private Map<String, String> info;
private String state;
private int slotsAssigned;
private int slotsOk;
private int slotsFail;
private int knownNodes;
}
@Data
@Builder
public class ClusterNode {
private String nodeId;
private String endpoint;
private String flags;
private String master;
private long pingTime;
private long pongTime;
private long epoch;
private String state;
private String slots;
}
@Data
@Builder
public class SlotInfo {
private String key;
private int slot;
private String masterHost;
private int masterPort;
private String slotRange;
}
🎯 面试高频问题精讲
问题1:Redis主从复制的工作原理是什么?数据一致性如何保证?
标准答案:
Redis主从复制采用异步复制机制,包含三种同步方式:
1. 全量同步(Full Resynchronization)
- 从节点首次连接主节点时触发
- 主节点执行BGSAVE生成RDB快照
- 将RDB文件发送给从节点
- 从节点清空数据库并加载RDB
- 主节点将期间的写命令发送给从节点
2. 增量同步(Incremental Sync)
- 正常运行时的实时同步
- 主节点将写命令实时发送给从节点
- 基于复制积压缓冲区(replication backlog)
3. 部分重同步(Partial Resynchronization)
- 从节点短暂断开后重连时触发
- 基于复制偏移量(replication offset)判断
- 只同步缺失的部分数据
数据一致性保证机制:
// 复制偏移量机制
public class ReplicationOffset {
private long masterOffset; // 主节点写入的数据量
private long slaveOffset; // 从节点复制的数据量
// 判断数据是否一致
public boolean isConsistent() {
return Math.abs(masterOffset - slaveOffset) <= ACCEPTABLE_LAG;
}
}
// 复制积压缓冲区
public class ReplicationBacklog {
private byte[] buffer; // 环形缓冲区
private long bufferSize; // 缓冲区大小(默认1MB)
private long masterOffset; // 主节点偏移量
// 判断是否可以部分重同步
public boolean canPartialSync(long slaveOffset) {
return masterOffset - slaveOffset <= bufferSize;
}
}
面试加分点:
- 提到
repl-diskless-sync
无盘复制优化 - 了解
min-slaves-to-write
参数控制写入安全性 - 知道复制超时和心跳检测机制
问题2:Redis Sentinel的脑裂问题如何解决?
标准答案:
脑裂现象:网络分区导致出现多个主节点,数据不一致。
解决方案:
1. 配置最小从节点数
# redis.conf配置
min-replicas-to-write 1 # 至少1个从节点在线才允许写入
min-replicas-max-lag 10 # 从节点最大延迟10秒
2. Sentinel Quorum机制
# sentinel.conf配置
sentinel monitor mymaster 192.168.1.100 6379 2 # 需要2个Sentinel同意才能故障转移
3. 客户端连接验证
@Service
public class AntiBrainSplitService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 防脑裂的写操作
*/
public void safeWrite(String key, Object value) {
try {
// 1. 检查当前节点是否为主节点
String role = redisTemplate.getConnectionFactory()
.getConnection().info("replication").getProperty("role");
if (!"master".equals(role)) {
throw new RuntimeException("当前节点不是主节点,拒绝写入");
}
// 2. 检查从节点数量
String connectedSlaves = redisTemplate.getConnectionFactory()
.getConnection().info("replication").getProperty("connected_slaves");
int slaveCount = Integer.parseInt(connectedSlaves);
if (slaveCount < 1) {
throw new RuntimeException("从节点数量不足,拒绝写入");
}
// 3. 执行写操作
redisTemplate.opsForValue().set(key, value);
} catch (Exception e) {
logger.error("防脑裂写操作失败", e);
throw e;
}
}
/**
* 检查数据一致性
*/
public boolean checkConsistency(String key) {
try {
// 从主节点读取
Object masterValue = redisTemplate.opsForValue().get(key);
// 从从节点读取(需要配置从节点连接)
Object slaveValue = slaveRedisTemplate.opsForValue().get(key);
return Objects.equals(masterValue, slaveValue);
} catch (Exception e) {
logger.error("一致性检查失败", e);
return false;
}
}
}
面试加分点:
- 了解网络分区检测机制
- 知道客户端重连策略
- 提到监控和告警的重要性
问题3:Redis Cluster如何处理槽位迁移?数据迁移过程中如何保证服务可用?
标准答案:
槽位迁移流程:
1. 迁移准备阶段
# 1. 设置目标节点为导入状态
CLUSTER SETSLOT 1000 IMPORTING target-node-id
# 2. 设置源节点为导出状态
CLUSTER SETSLOT 1000 MIGRATING source-node-id
2. 数据迁移阶段
# 3. 逐个迁移槽位中的Key
MIGRATE target-host target-port key destination-db timeout
# 4. 检查槽位是否为空
CLUSTER COUNTKEYSINSLOT 1000
3. 迁移完成阶段
# 5. 更新槽位归属
CLUSTER SETSLOT 1000 NODE target-node-id
# 6. 广播槽位变更
CLUSTER NODES
服务可用性保证机制:
/**
* 槽位迁移期间的客户端处理
*/
@Service
public class ClusterMigrationHandler {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 迁移期间的安全读写
*/
public Object safeGet(String key) {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
return redisTemplate.opsForValue().get(key);
} catch (RedisRedirectionException e) {
// 处理MOVED重定向
if (e.getMessage().contains("MOVED")) {
logger.info("槽位已迁移,重定向到新节点: {}", e.getMessage());
// 客户端会自动重定向,重试即可
retryCount++;
continue;
}
// 处理ASK重定向
if (e.getMessage().contains("ASK")) {
logger.info("槽位正在迁移,临时重定向: {}", e.getMessage());
// 发送ASKING命令后重试
handleAskRedirection(e);
retryCount++;
continue;
}
throw e;
} catch (Exception e) {
logger.error("读取失败,重试: key={}, attempt={}", key, retryCount + 1);
retryCount++;
if (retryCount >= maxRetries) {
throw new RuntimeException("读取失败,已达最大重试次数", e);
}
// 等待后重试
try {
Thread.sleep(100 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试被中断", ie);
}
}
}
throw new RuntimeException("读取失败,已达最大重试次数");
}
/**
* 处理ASK重定向
*/
private void handleAskRedirection(RedisRedirectionException e) {
// 解析ASK响应,获取目标节点信息
String askMessage = e.getMessage();
// 格式: ASK 1000 192.168.1.101:7001
String[] parts = askMessage.split(" ");
String targetNode = parts[2];
// 向目标节点发送ASKING命令
try {
// 这里需要建立到目标节点的连接并发送ASKING
logger.info("发送ASKING命令到目标节点: {}", targetNode);
} catch (Exception ex) {
logger.error("发送ASKING命令失败", ex);
}
}
/**
* 迁移期间的写操作
*/
public void safeSet(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
} catch (RedisRedirectionException e) {
if (e.getMessage().contains("MOVED")) {
// 槽位已完全迁移,重试写入
logger.info("槽位已迁移,重新写入: {}", e.getMessage());
redisTemplate.opsForValue().set(key, value);
} else {
throw new RuntimeException("写操作失败,槽位迁移中", e);
}
}
}
}
/**
* 迁移状态监控
*/
@Component
public class MigrationMonitor {
/**
* 监控迁移进度
*/
@Scheduled(fixedDelay = 5000)
public void monitorMigration() {
try {
// 检查集群状态
Map<String, String> clusterInfo = getClusterInfo();
String state = clusterInfo.get("cluster_state");
if ("fail".equals(state)) {
logger.error("集群状态异常,可能存在迁移问题");
// 发送告警
sendAlert("集群状态异常");
}
// 检查迁移中的槽位
int migratingSlots = Integer.parseInt(
clusterInfo.getOrDefault("cluster_slots_migrating", "0"));
int importingSlots = Integer.parseInt(
clusterInfo.getOrDefault("cluster_slots_importing", "0"));
if (migratingSlots > 0 || importingSlots > 0) {
logger.info("检测到槽位迁移: migrating={}, importing={}",
migratingSlots, importingSlots);
}
} catch (Exception e) {
logger.error("迁移监控失败", e);
}
}
private Map<String, String> getClusterInfo() {
// 获取集群信息的实现
return new HashMap<>();
}
private void sendAlert(String message) {
// 发送告警的实现
logger.warn("发送告警: {}", message);
}
}
面试加分点:
- 了解MOVED和ASK重定向的区别
- 知道槽位迁移的原子性保证
- 提到客户端缓存刷新机制
问题4:Redis集群扩容和缩容的最佳实践是什么?
标准答案:
扩容最佳实践:
1. 添加节点
# 1. 启动新节点
redis-server /path/to/redis-7003.conf
# 2. 将新节点加入集群
redis-cli --cluster add-node 192.168.1.101:7003 192.168.1.101:7000
# 3. 分配槽位给新节点
redis-cli --cluster reshard 192.168.1.101:7000
2. 自动化扩容脚本
/**
* 集群自动扩容管理
*/
@Service
public class ClusterScalingService {
/**
* 自动扩容
*/
public void autoScale() {
// 1. 检查集群负载
ClusterMetrics metrics = getClusterMetrics();
if (shouldScaleOut(metrics)) {
logger.info("集群负载过高,开始自动扩容");
scaleOut();
} else if (shouldScaleIn(metrics)) {
logger.info("集群负载较低,开始自动缩容");
scaleIn();
}
}
/**
* 扩容判断条件
*/
private boolean shouldScaleOut(ClusterMetrics metrics) {
return metrics.getCpuUsage() > 80 || // CPU使用率超过80%
metrics.getMemoryUsage() > 85 || // 内存使用率超过85%
metrics.getConnectionCount() > 8000 || // 连接数超过8000
metrics.getQps() > 50000; // QPS超过5万
}
/**
* 缩容判断条件
*/
private boolean shouldScaleIn(ClusterMetrics metrics) {
return metrics.getCpuUsage() < 30 && // CPU使用率低于30%
metrics.getMemoryUsage() < 40 && // 内存使用率低于40%
metrics.getConnectionCount() < 2000 && // 连接数低于2000
metrics.getQps() < 10000 && // QPS低于1万
getCurrentNodeCount() > 3; // 节点数大于最小值3
}
/**
* 执行扩容
*/
private void scaleOut() {
try {
// 1. 创建新节点实例
String newNodeId = createNewNode();
// 2. 将新节点加入集群
addNodeToCluster(newNodeId);
// 3. 重新分配槽位
reshardSlots(newNodeId);
// 4. 验证扩容结果
validateScaleOut(newNodeId);
logger.info("集群扩容完成: newNode={}", newNodeId);
} catch (Exception e) {
logger.error("集群扩容失败", e);
// 回滚操作
rollbackScaleOut();
}
}
/**
* 执行缩容
*/
private void scaleIn() {
try {
// 1. 选择要移除的节点
String nodeToRemove = selectNodeToRemove();
// 2. 迁移槽位到其他节点
migrateSlots(nodeToRemove);
// 3. 从集群中移除节点
removeNodeFromCluster(nodeToRemove);
// 4. 销毁节点实例
destroyNode(nodeToRemove);
logger.info("集群缩容完成: removedNode={}", nodeToRemove);
} catch (Exception e) {
logger.error("集群缩容失败", e);
// 回滚操作
rollbackScaleIn();
}
}
/**
* 平滑重分片
*/
private void reshardSlots(String targetNodeId) {
try {
// 计算每个节点应该分配的槽位数
int totalNodes = getCurrentNodeCount();
int slotsPerNode = 16384 / totalNodes;
// 从现有节点迁移部分槽位到新节点
List<String> sourceNodes = getExistingNodes();
int slotsToMigrate = slotsPerNode;
for (String sourceNode : sourceNodes) {
if (slotsToMigrate <= 0) break;
List<Integer> sourceSlots = getNodeSlots(sourceNode);
int slotsFromThisNode = Math.min(slotsToMigrate, sourceSlots.size() / totalNodes);
for (int i = 0; i < slotsFromThisNode; i++) {
migrateSlot(sourceSlots.get(i), sourceNode, targetNodeId);
slotsToMigrate--;
}
}
} catch (Exception e) {
logger.error("重分片失败", e);
throw e;
}
}
/**
* 渐进式槽位迁移
*/
private void migrateSlot(int slot, String sourceNode, String targetNode) {
try {
// 1. 设置迁移状态
setSlotMigrating(slot, sourceNode, targetNode);
// 2. 逐个迁移Key
List<String> keys = getSlotKeys(slot, sourceNode);
int batchSize = 100; // 每批迁移100个Key
for (int i = 0; i < keys.size(); i += batchSize) {
List<String> batch = keys.subList(i, Math.min(i + batchSize, keys.size()));
migrateBatch(batch, sourceNode, targetNode);
// 避免阻塞,每批次之间暂停
Thread.sleep(10);
}
// 3. 完成槽位迁移
completeSlotMigration(slot, targetNode);
} catch (Exception e) {
logger.error("槽位迁移失败: slot={}", slot, e);
throw e;
}
}
}
/**
* 集群监控指标
*/
@Data
public class ClusterMetrics {
private double cpuUsage; // CPU使用率
private double memoryUsage; // 内存使用率
private int connectionCount; // 连接数
private long qps; // 每秒查询数
private long networkIn; // 网络入流量
private long networkOut; // 网络出流量
private Map<String, NodeMetrics> nodeMetrics; // 各节点指标
}
@Data
public class NodeMetrics {
private String nodeId;
private String endpoint;
private double cpuUsage;
private double memoryUsage;
private int slotCount;
private long keyCount;
private boolean isMaster;
}
面试加分点:
- 了解蓝绿部署和滚动升级策略
- 知道数据预热和缓存穿透预防
- 提到监控和自动化运维的重要性
问题5:如何设计Redis高可用架构来应对不同的故障场景?
标准答案:
多层次高可用架构设计:
/**
* 高可用架构设计
*/
@Configuration
public class HighAvailabilityArchitecture {
/**
* 多级缓存架构
*/
@Bean
public MultiLevelCacheManager multiLevelCache() {
return MultiLevelCacheManager.builder()
.l1Cache(caffeineCache()) // 本地缓存
.l2Cache(redisClusterCache()) // Redis集群缓存
.l3Cache(redisSentinelCache()) // Redis哨兵缓存
.l4Cache(databaseCache()) // 数据库缓存
.build();
}
/**
* 故障转移策略
*/
@Bean
public FailoverStrategy failoverStrategy() {
return FailoverStrategy.builder()
.primaryDataSource(redisCluster())
.secondaryDataSource(redisSentinel())
.fallbackDataSource(database())
.circuitBreakerConfig(circuitBreakerConfig())
.retryConfig(retryConfig())
.build();
}
}
/**
* 高可用缓存服务
*/
@Service
public class HighAvailabilityCacheService {
@Autowired
private MultiLevelCacheManager cacheManager;
@Autowired
private FailoverStrategy failoverStrategy;
private static final Logger logger = LoggerFactory.getLogger(HighAvailabilityCacheService.class);
/**
* 高可用读取
*/
public <T> T get(String key, Class<T> type) {
// L1: 本地缓存
T value = cacheManager.getL1Cache().get(key, type);
if (value != null) {
logger.debug("L1缓存命中: key={}", key);
return value;
}
// L2: Redis集群
try {
value = cacheManager.getL2Cache().get(key, type);
if (value != null) {
logger.debug("L2缓存命中: key={}", key);
// 回填L1缓存
cacheManager.getL1Cache().put(key, value);
return value;
}
} catch (Exception e) {
logger.warn("L2缓存访问失败,降级到L3: key={}", key, e);
}
// L3: Redis哨兵
try {
value = cacheManager.getL3Cache().get(key, type);
if (value != null) {
logger.debug("L3缓存命中: key={}", key);
// 回填上级缓存
backfillUpperCache(key, value);
return value;
}
} catch (Exception e) {
logger.warn("L3缓存访问失败,降级到数据库: key={}", key, e);
}
// L4: 数据库
try {
value = cacheManager.getL4Cache().get(key, type);
if (value != null) {
logger.debug("数据库查询成功: key={}", key);
// 回填所有级别缓存
backfillAllCache(key, value);
return value;
}
} catch (Exception e) {
logger.error("所有数据源访问失败: key={}", key, e);
throw new RuntimeException("数据访问失败", e);
}
return null;
}
/**
* 高可用写入
*/
public void put(String key, Object value, Duration ttl) {
List<Exception> exceptions = new ArrayList<>();
// 尝试写入所有可用的缓存层
try {
cacheManager.getL2Cache().put(key, value, ttl);
logger.debug("L2缓存写入成功: key={}", key);
} catch (Exception e) {
logger.warn("L2缓存写入失败: key={}", key, e);
exceptions.add(e);
}
try {
cacheManager.getL3Cache().put(key, value, ttl);
logger.debug("L3缓存写入成功: key={}", key);
} catch (Exception e) {
logger.warn("L3缓存写入失败: key={}", key, e);
exceptions.add(e);
}
// 更新本地缓存
cacheManager.getL1Cache().put(key, value, ttl);
// 如果所有远程缓存都失败,抛出异常
if (exceptions.size() >= 2) {
throw new RuntimeException("缓存写入失败", exceptions.get(0));
}
}
/**
* 故障恢复处理
*/
@EventListener
public void handleFailoverEvent(FailoverEvent event) {
logger.info("处理故障转移事件: {}", event);
switch (event.getType()) {
case REDIS_CLUSTER_DOWN:
handleRedisClusterFailure();
break;
case REDIS_SENTINEL_DOWN:
handleRedisSentinelFailure();
break;
case DATABASE_DOWN:
handleDatabaseFailure();
break;
default:
logger.warn("未知的故障类型: {}", event.getType());
}
}
private void handleRedisClusterFailure() {
// 1. 切换到哨兵模式
failoverStrategy.switchToSecondary();
// 2. 增加本地缓存容量
cacheManager.expandL1Cache();
// 3. 启用写操作缓冲
enableWriteBuffer();
// 4. 发送告警通知
sendAlert("Redis集群故障,已切换到哨兵模式");
}
private void handleRedisSentinelFailure() {
// 1. 完全依赖本地缓存和数据库
failoverStrategy.switchToFallback();
// 2. 启用读写分离
enableReadWriteSeparation();
// 3. 增加数据库连接池
expandDatabasePool();
// 4. 发送告警通知
sendAlert("Redis完全故障,已切换到数据库模式");
}
/**
* 健康检查和自动恢复
*/
@Scheduled(fixedDelay = 30000)
public void healthCheck() {
// 检查各级缓存健康状态
Map<String, Boolean> healthStatus = new HashMap<>();
healthStatus.put("L1_CACHE", checkL1CacheHealth());
healthStatus.put("L2_CACHE", checkL2CacheHealth());
healthStatus.put("L3_CACHE", checkL3CacheHealth());
healthStatus.put("DATABASE", checkDatabaseHealth());
// 自动恢复
if (healthStatus.get("L2_CACHE") && !failoverStrategy.isL2Active()) {
logger.info("检测到L2缓存恢复,开始自动切回");
autoRecovery();
}
// 记录健康状态
logger.debug("健康检查结果: {}", healthStatus);
}
private void autoRecovery() {
try {
// 1. 预热缓存
warmupCache();
// 2. 切换回主要数据源
failoverStrategy.switchToPrimary();
// 3. 恢复正常配置
restoreNormalConfig();
logger.info("自动恢复完成");
} catch (Exception e) {
logger.error("自动恢复失败", e);
}
}
}
面试加分点:
- 了解CAP理论在Redis中的应用
- 知道数据一致性和可用性的权衡
- 提到容灾备份和异地多活方案
⚡ 性能优化与注意事项
主从复制性能优化
1. 复制缓冲区优化
# redis.conf优化配置
repl-backlog-size 64mb # 增大复制积压缓冲区
repl-backlog-ttl 3600 # 缓冲区保留时间1小时
repl-timeout 60 # 复制超时时间
repl-ping-slave-period 10 # 心跳间隔
2. 网络优化
# 启用TCP_NODELAY,减少延迟
repl-disable-tcp-nodelay no
# 无盘复制优化(适用于网络比磁盘快的场景)
repl-diskless-sync yes
repl-diskless-sync-delay 5
3. 从节点优化
@Configuration
public class SlaveOptimizationConfig {
/**
* 从节点连接池优化
*/
@Bean
public LettuceConnectionFactory optimizedSlaveFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName(slaveHost);
config.setPort(slavePort);
// 从节点连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(100); // 增加最大连接数
poolConfig.setMaxIdle(50); // 增加最大空闲连接
poolConfig.setMinIdle(20); // 保持最小连接数
poolConfig.setTestOnBorrow(false); // 关闭借用时测试,提高性能
poolConfig.setTestWhileIdle(true); // 空闲时测试连接
poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 30秒检查一次
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.readFrom(ReadFrom.REPLICA) // 强制从从节点读取
.commandTimeout(Duration.ofSeconds(2)) // 减少命令超时时间
.build();
return new LettuceConnectionFactory(config, clientConfig);
}
}
Sentinel性能优化
1. Sentinel配置优化
# sentinel.conf性能优化
sentinel down-after-milliseconds mymaster 5000 # 减少故障检测时间
sentinel failover-timeout mymaster 60000 # 减少故障转移超时
sentinel parallel-syncs mymaster 2 # 增加并行同步数量
sentinel deny-scripts-reconfig yes # 禁用脚本重配置提高安全性
2. 客户端连接优化
@Service
public class OptimizedSentinelService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final LoadingCache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(5))
.recordStats()
.build(key -> {
// 缓存未命中时从Redis加载
return redisTemplate.opsForValue().get(key);
});
/**
* 优化的读取操作
*/
public Object optimizedGet(String key) {
try {
// 1. 先检查本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 2. 从Redis读取
value = redisTemplate.opsForValue().get(key);
if (value != null) {
// 3. 更新本地缓存
localCache.put(key, value);
}
return value;
} catch (Exception e) {
// 4. 故障时返回本地缓存
Object cachedValue = localCache.getIfPresent(key);
if (cachedValue != null) {
logger.warn("Redis访问失败,返回本地缓存: key={}", key);
return cachedValue;
}
throw e;
}
}
/**
* 批量预加载
*/
@Async
public void preloadCache(List<String> keys) {
try {
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
for (int i = 0; i < keys.size(); i++) {
if (values.get(i) != null) {
localCache.put(keys.get(i), values.get(i));
}
}
logger.info("预加载缓存完成: count={}", keys.size());
} catch (Exception e) {
logger.error("预加载缓存失败", e);
}
}
}
Cluster性能优化
1. 槽位分布优化
/**
* 集群槽位优化管理
*/
@Service
public class ClusterSlotOptimizer {
/**
* 智能槽位重分配
*/
public void optimizeSlotDistribution() {
try {
// 1. 收集各节点负载信息
Map<String, NodeLoad> nodeLoads = collectNodeLoads();
// 2. 计算最优分配方案
Map<String, List<Integer>> optimalDistribution = calculateOptimalDistribution(nodeLoads);
// 3. 执行渐进式重分配
executeGradualRebalancing(optimalDistribution);
} catch (Exception e) {
logger.error("槽位优化失败", e);
}
}
/**
* 热点Key检测和优化
*/
@Scheduled(fixedDelay = 300000) // 5分钟检查一次
public void detectAndOptimizeHotKeys() {
try {
// 1. 检测热点Key
Map<String, Long> hotKeys = detectHotKeys();
// 2. 分析热点分布
Map<Integer, List<String>> hotKeysBySlot = groupHotKeysBySlot(hotKeys);
// 3. 优化热点槽位
for (Map.Entry<Integer, List<String>> entry : hotKeysBySlot.entrySet()) {
int slot = entry.getKey();
List<String> keys = entry.getValue();
if (keys.size() > 100) { // 热点阈值
optimizeHotSlot(slot, keys);
}
}
} catch (Exception e) {
logger.error("热点Key优化失败", e);
}
}
private void optimizeHotSlot(int slot, List<String> hotKeys) {
// 1. 为热点Key创建本地缓存
for (String key : hotKeys) {
String value = getFromRedis(key);
if (value != null) {
localCache.put(key, value, Duration.ofMinutes(10));
}
}
// 2. 考虑将热点槽位迁移到性能更好的节点
String currentNode = getSlotNode(slot);
String betterNode = findBetterNodeForHotSlot(slot);
if (betterNode != null && !betterNode.equals(currentNode)) {
logger.info("计划迁移热点槽位: slot={}, from={}, to={}", slot, currentNode, betterNode);
// 这里可以标记为待迁移,在低峰期执行
scheduleSlotMigration(slot, currentNode, betterNode);
}
}
}
2. 连接池优化
@Configuration
public class ClusterConnectionOptimization {
@Bean
public LettuceConnectionFactory optimizedClusterFactory() {
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(clusterNodes);
clusterConfig.setMaxRedirects(2); // 减少重定向次数
// 优化的连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(200); // 增加总连接数
poolConfig.setMaxIdle(100); // 增加最大空闲连接
poolConfig.setMinIdle(50); // 保持足够的最小连接
poolConfig.setTestOnBorrow(false); // 关闭借用测试
poolConfig.setTestOnReturn(false); // 关闭归还测试
poolConfig.setTestWhileIdle(true); // 只在空闲时测试
poolConfig.setBlockWhenExhausted(false); // 连接耗尽时不阻塞
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(3))
.shutdownTimeout(Duration.ofSeconds(5))
.readFrom(ReadFrom.REPLICA_PREFERRED)
.build();
return new LettuceConnectionFactory(clusterConfig, clientConfig);
}
}
重要注意事项
1. 数据一致性权衡
/**
* 一致性级别配置
*/
@Component
public class ConsistencyManager {
public enum ConsistencyLevel {
EVENTUAL, // 最终一致性
STRONG, // 强一致性
WEAK // 弱一致性
}
/**
* 根据业务场景选择一致性级别
*/
public void writeWithConsistency(String key, Object value, ConsistencyLevel level) {
switch (level) {
case STRONG:
// 同步写入主节点,等待从节点确认
writeToMasterAndWaitSlaves(key, value);
break;
case EVENTUAL:
// 只写入主节点,异步复制到从节点
writeToMasterOnly(key, value);
break;
case WEAK:
// 写入任意可用节点
writeToAnyAvailableNode(key, value);
break;
}
}
}
2. 容量规划建议
/**
* 容量规划工具
*/
@Component
public class CapacityPlanner {
/**
* 计算集群容量需求
*/
public ClusterCapacityPlan calculateCapacity(BusinessRequirement requirement) {
// 1. 数据量估算
long totalDataSize = requirement.getKeyCount() * requirement.getAvgValueSize();
// 2. 内存需求(考虑复制和开销)
long memoryRequired = totalDataSize * 2 * 1.2; // 主从复制 + 20%开销
// 3. QPS需求分析
long readQps = requirement.getReadQps();
long writeQps = requirement.getWriteQps();
// 4. 节点数量计算
int nodeCount = Math.max(
(int) Math.ceil(memoryRequired / MAX_NODE_MEMORY),
(int) Math.ceil((readQps + writeQps) / MAX_NODE_QPS)
);
// 5. 确保最小节点数
nodeCount = Math.max(nodeCount, 3);
return ClusterCapacityPlan.builder()
.nodeCount(nodeCount)
.memoryPerNode(memoryRequired / nodeCount)
.expectedQpsPerNode((readQps + writeQps) / nodeCount)
.replicationFactor(2)
.build();
}
}
3. 监控告警配置
# 监控指标阈值配置
monitoring:
redis:
cluster:
# 节点健康监控
node-down-threshold: 1 # 节点下线告警阈值
memory-usage-threshold: 85 # 内存使用率告警阈值
cpu-usage-threshold: 80 # CPU使用率告警阈值
connection-threshold: 8000 # 连接数告警阈值
# 性能监控
response-time-threshold: 100 # 响应时间告警阈值(毫秒)
qps-threshold: 50000 # QPS告警阈值
error-rate-threshold: 1 # 错误率告警阈值(%)
# 复制监控
replication-lag-threshold: 10 # 复制延迟告警阈值(秒)
backlog-usage-threshold: 80 # 积压缓冲区使用率阈值
# 集群监控
slot-migration-timeout: 300 # 槽位迁移超时告警(秒)
cluster-state-check-interval: 30 # 集群状态检查间隔(秒)
4. 常见问题避免
- 避免大Key:单个Key不超过10MB,使用Hash分片
- 避免热点Key:使用本地缓存和多级缓存
- 避免阻塞操作:使用SCAN替代KEYS,避免长时间的SORT操作
- 合理设置过期时间:避免大量Key同时过期
- 监控慢查询:设置slowlog监控,优化慢查询
- 网络优化:使用pipeline批量操作,减少网络往返
- 内存优化:合理配置maxmemory和淘汰策略
📚 总结与技术对比
Redis高可用方案对比
特性 | 主从复制 | Sentinel哨兵 | Cluster集群 |
---|---|---|---|
部署复杂度 | 简单 | 中等 | 复杂 |
故障转移 | 手动 | 自动 | 自动 |
数据分片 | 不支持 | 不支持 | 支持 |
扩展性 | 垂直扩展 | 垂直扩展 | 水平扩展 |
一致性 | 最终一致性 | 最终一致性 | 最终一致性 |
客户端复杂度 | 低 | 中等 | 高 |
运维成本 | 低 | 中等 | 高 |
适用场景 | 小规模、读多写少 | 中等规模、高可用要求 | 大规模、高并发 |
方案选择建议
1. 主从复制适用场景
// 适用条件评估
public class MasterSlaveScenario {
public boolean isApplicable(SystemRequirement req) {
return req.getDataSize() < 50_000_000_000L && // 数据量 < 50GB
req.getWriteQps() < 10_000 && // 写QPS < 1万
req.getReadQps() < 50_000 && // 读QPS < 5万
req.getDowntimeTolerance() > 300 && // 可容忍5分钟故障
req.getTeamSize() < 5; // 运维团队 < 5人
}
}
2. Sentinel哨兵适用场景
// 适用条件评估
public class SentinelScenario {
public boolean isApplicable(SystemRequirement req) {
return req.getDataSize() < 200_000_000_000L && // 数据量 < 200GB
req.getWriteQps() < 50_000 && // 写QPS < 5万
req.getReadQps() < 200_000 && // 读QPS < 20万
req.getDowntimeTolerance() < 60 && // 故障恢复 < 1分钟
req.isHighAvailabilityRequired(); // 需要高可用
}
}
3. Cluster集群适用场景
// 适用条件评估
public class ClusterScenario {
public boolean isApplicable(SystemRequirement req) {
return req.getDataSize() > 100_000_000_000L || // 数据量 > 100GB
req.getWriteQps() > 30_000 || // 写QPS > 3万
req.getReadQps() > 100_000 || // 读QPS > 10万
req.isHorizontalScalingRequired() || // 需要水平扩展
req.getGrowthRate() > 0.5; // 年增长率 > 50%
}
}
架构演进路径
1. 渐进式演进策略
/**
* Redis架构演进管理器
*/
@Service
public class ArchitectureEvolutionManager {
/**
* 评估当前架构是否需要升级
*/
public EvolutionPlan evaluateEvolution(CurrentArchitecture current, SystemMetrics metrics) {
EvolutionPlan plan = new EvolutionPlan();
// 1. 评估性能瓶颈
if (isPerformanceBottleneck(metrics)) {
if (current.getType() == ArchitectureType.MASTER_SLAVE) {
plan.recommendUpgrade(ArchitectureType.SENTINEL, "性能瓶颈,建议升级到Sentinel");
} else if (current.getType() == ArchitectureType.SENTINEL) {
plan.recommendUpgrade(ArchitectureType.CLUSTER, "性能瓶颈,建议升级到Cluster");
}
}
// 2. 评估可用性需求
if (isAvailabilityInsufficient(current, metrics)) {
if (current.getType() == ArchitectureType.MASTER_SLAVE) {
plan.recommendUpgrade(ArchitectureType.SENTINEL, "可用性不足,需要自动故障转移");
}
}
// 3. 评估扩展性需求
if (isScalabilityInsufficient(current, metrics)) {
plan.recommendUpgrade(ArchitectureType.CLUSTER, "扩展性不足,需要水平扩展");
}
return plan;
}
/**
* 执行架构升级
*/
public void executeEvolution(EvolutionPlan plan) {
try {
switch (plan.getTargetArchitecture()) {
case SENTINEL:
upgradeToSentinel(plan);
break;
case CLUSTER:
upgradeToCluster(plan);
break;
default:
throw new UnsupportedOperationException("不支持的架构类型");
}
} catch (Exception e) {
logger.error("架构升级失败", e);
rollbackEvolution(plan);
}
}
private void upgradeToSentinel(EvolutionPlan plan) {
// 1. 部署Sentinel节点
deploySentinelNodes();
// 2. 配置主从监控
configureMasterSlaveMonitoring();
// 3. 切换客户端连接
switchClientToSentinel();
// 4. 验证升级结果
validateSentinelDeployment();
}
private void upgradeToCluster(EvolutionPlan plan) {
// 1. 创建集群节点
createClusterNodes();
// 2. 初始化集群
initializeCluster();
// 3. 数据迁移
migrateDataToCluster();
// 4. 切换客户端
switchClientToCluster();
// 5. 验证集群状态
validateClusterDeployment();
}
}
2. 零停机升级策略
/**
* 零停机升级实现
*/
@Service
public class ZeroDowntimeUpgradeService {
/**
* 主从到Sentinel零停机升级
*/
public void upgradeToSentinelZeroDowntime() {
try {
// 1. 在现有主从基础上部署Sentinel
List<String> sentinelNodes = deploySentinelNodesParallel();
// 2. 配置Sentinel监控现有主从
configureSentinelMonitoring(sentinelNodes);
// 3. 验证Sentinel正常工作
validateSentinelOperations();
// 4. 逐步切换客户端连接
gradualClientMigration();
// 5. 清理旧配置
cleanupOldConfiguration();
logger.info("零停机升级到Sentinel完成");
} catch (Exception e) {
logger.error("零停机升级失败,开始回滚", e);
rollbackToMasterSlave();
}
}
/**
* Sentinel到Cluster零停机升级
*/
public void upgradeToClusterZeroDowntime() {
try {
// 1. 创建新的集群环境
ClusterEnvironment newCluster = createNewClusterEnvironment();
// 2. 实时数据同步
startRealTimeDataSync(newCluster);
// 3. 验证数据一致性
validateDataConsistency();
// 4. 切换读流量到新集群
switchReadTrafficToCluster(newCluster);
// 5. 切换写流量到新集群
switchWriteTrafficToCluster(newCluster);
// 6. 停止旧环境
shutdownOldEnvironment();
logger.info("零停机升级到Cluster完成");
} catch (Exception e) {
logger.error("零停机升级失败,开始回滚", e);
rollbackToSentinel();
}
}
/**
* 渐进式客户端迁移
*/
private void gradualClientMigration() {
List<String> clientGroups = getClientGroups();
for (String group : clientGroups) {
try {
// 1. 切换客户端组到新架构
switchClientGroup(group);
// 2. 监控切换后的性能
monitorPerformanceAfterSwitch(group);
// 3. 等待一段时间确保稳定
Thread.sleep(60000); // 等待1分钟
logger.info("客户端组切换完成: {}", group);
} catch (Exception e) {
logger.error("客户端组切换失败: {}", group, e);
// 回滚该客户端组
rollbackClientGroup(group);
throw e;
}
}
}
}
最佳实践总结
1. 架构设计原则
- 渐进式演进:从简单到复杂,根据业务需求逐步升级
- 监控先行:完善的监控体系是高可用的基础
- 自动化运维:减少人工干预,提高运维效率
- 容量规划:提前规划容量,避免临时扩容
- 故障演练:定期进行故障演练,验证高可用方案
2. 运维建议
- 文档完善:维护详细的架构文档和运维手册
- 版本管理:统一Redis版本,便于维护升级
- 配置管理:使用配置管理工具统一管理配置
- 备份策略:制定完善的数据备份和恢复策略
- 安全加固:配置访问控制、网络隔离等安全措施
3. 开发建议
- 客户端优化:合理配置连接池和超时参数
- 数据模型设计:避免大Key和热点Key
- 缓存策略:设计合理的缓存更新和失效策略
- 降级方案:准备缓存降级和熔断机制
- 性能测试:定期进行性能压测,发现潜在问题
Redis集群架构与高可用方案是构建大规模分布式系统的重要基础设施。通过合理选择架构方案、优化配置参数、完善监控体系,可以构建出稳定可靠的Redis高可用服务,为业务系统提供强有力的缓存支撑。在实际应用中,需要根据具体的业务场景和技术要求,选择最适合的高可用方案,并制定完善的运维策略。