This commit is contained in:
zhoujia
2025-10-24 15:51:46 +08:00
parent 057a095596
commit c61d154937
23 changed files with 3092 additions and 8 deletions

View File

@ -0,0 +1,637 @@
# Redis Stream 技术分析报告
## 项目概述
本项目是一个基于Spring Boot的Redis Stream消息队列系统实现了两种消费者模式StreamListener模式和Manual Ack模式。项目展示了Redis Stream在消息队列场景下的关键特性包括消费者组管理、消息确认机制、以及高可用性保证。
## 核心问题分析
### 1. 消费者组宕机再上线未ACK消息是否重复/丢失
#### 1.1 问题分析
**Redis Stream的消费者组机制保证了消息的可靠性**
1. **消息不会丢失**未ACK的消息会保留在Redis Stream中直到被明确确认
2. **消息可能重复**消费者重启后会重新处理未ACK的消息
#### 1.2 代码实现分析
`StreamListenerConsumerService.java``ManualAckConsumerService.java`的实现可以看出:
```java
// StreamListenerConsumerService.java 第268-279行
private void acknowledgeMessage(String recordId) {
try {
redisTemplate.opsForStream().acknowledge(
redisStreamProperties.getKey(),
redisStreamProperties.getConsumerGroup(),
recordId
);
log.debug("StreamListener 消息已确认: recordId={}", recordId);
} catch (Exception e) {
log.error("StreamListener 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
}
}
```
```java
// ManualAckConsumerService.java 第305-317行
private void acknowledgeMessage(String recordId) {
try {
redisTemplate.opsForStream().acknowledge(
redisStreamProperties.getKey(),
redisStreamProperties.getConsumerGroup(),
recordId
);
log.debug("Manual Ack 消息已确认: recordId={}", recordId);
} catch (Exception e) {
log.error("Manual Ack 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
}
}
```
#### 1.3 消费者组恢复机制
```java
// ManualAckConsumerService.java 第58-170行
private void ensureConsumerGroupExists() {
// 检查消费者组是否存在
Object groups = redisTemplate.opsForStream().groups(redisStreamProperties.getKey());
// 如果不存在,创建消费者组
if (!groupExists) {
redisTemplate.opsForStream().createGroup(
redisStreamProperties.getKey(),
ReadOffset.from("0"),
redisStreamProperties.getConsumerGroup()
);
}
}
```
#### 1.4 结论
- **消息不会丢失**Redis Stream的持久化机制保证消息持久存储
- **消息可能重复**消费者重启后会重新处理未ACK的消息
- **建议**:业务逻辑需要支持幂等性处理
### 2. 生产者端事务消息MULTI/EXEC 或 Lua 脚本)能否保证"业务 DB 提交 + 消息入队"原子性
#### 2.1 问题分析
**项目已实现完整的事务消息机制**,支持多种事务模式来保证"业务 DB 提交 + 消息入队"的原子性。
#### 2.2 已实现的方案
##### 方案一Redis MULTI/EXEC事务
```java
// TransactionalMessageProducerService.java 第42-81行
public String sendMessageWithMultiExec(Message message) {
try {
// 生成消息ID
String messageId = UUID.randomUUID().toString();
message.setId(messageId);
// 将消息转换为Map
Map<String, String> messageMap = convertMessageToMap(message);
// 使用MULTI/EXEC事务
Object result = redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(org.springframework.data.redis.core.RedisOperations operations)
throws org.springframework.dao.DataAccessException {
operations.multi();
try {
// 在事务中执行Redis操作
operations.opsForStream().add(redisStreamProperties.getKey(), messageMap);
// 可以添加其他Redis操作比如更新计数器、设置过期时间等
operations.opsForValue().increment("message:count");
operations.opsForValue().set("message:last:" + messageId, System.currentTimeMillis());
// 提交事务
return operations.exec();
} catch (Exception e) {
operations.discard();
throw e;
}
}
});
log.info("MULTI/EXEC事务消息发送成功: messageId={}, result={}", messageId, result);
return messageId;
} catch (Exception e) {
log.error("MULTI/EXEC事务消息发送失败: {}", e.getMessage(), e);
throw new RuntimeException("事务消息发送失败", e);
}
}
```
##### 方案二Lua脚本实现
```java
// LuaScriptTransactionalService.java 第40-62行
private static final String BASIC_SEND_SCRIPT = """
local streamKey = KEYS[1]
local messageId = ARGV[1]
local content = ARGV[2]
local type = ARGV[3]
local sender = ARGV[4]
local timestamp = ARGV[5]
-- 发送消息到Stream
local recordId = redis.call('XADD', streamKey, '*',
'id', messageId,
'content', content,
'type', type,
'sender', sender,
'timestamp', timestamp
)
-- 更新统计信息
redis.call('INCR', 'message:count')
redis.call('SET', 'message:last:' .. messageId, timestamp)
return recordId
""";
```
##### 方案三:消息表模式(推荐)
```java
// MessageTableTransactionalService.java 第67-95行
@Transactional
public String sendTransactionalMessage(Message message, String businessData) {
String messageId = UUID.randomUUID().toString();
message.setId(messageId);
try {
// 1. 执行业务操作
log.info("执行业务操作: messageId={}, businessData={}", messageId, businessData);
jdbcTemplate.update(
"INSERT INTO business_data (id, data, status, created_at) VALUES (?, ?, ?, ?)",
messageId, businessData, "PROCESSING", LocalDateTime.now()
);
// 2. 记录消息到消息表
log.info("记录消息到消息表: messageId={}", messageId);
jdbcTemplate.update(
"INSERT INTO message_table (id, content, type, sender, status, retry_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
messageId, message.getContent(), message.getType(), message.getSender(),
MessageStatus.PENDING.name(), 0, LocalDateTime.now(), LocalDateTime.now()
);
log.info("事务消息记录成功: messageId={}", messageId);
return messageId;
} catch (Exception e) {
log.error("事务消息记录失败: messageId={}, error={}", messageId, e.getMessage(), e);
throw new RuntimeException("事务消息记录失败", e);
}
}
```
##### 方案四:两阶段提交模式
```java
// TransactionalMessageProducerService.java 第210-260行
public String sendMessageWithTwoPhaseCommit(Message message) {
String messageId = UUID.randomUUID().toString();
message.setId(messageId);
try {
// 第一阶段:准备阶段
log.info("两阶段提交 - 准备阶段: messageId={}", messageId);
// 在Redis中创建准备状态的消息
Map<String, String> prepareData = new HashMap<>();
prepareData.put("id", messageId);
prepareData.put("content", message.getContent());
prepareData.put("type", message.getType());
prepareData.put("sender", message.getSender());
prepareData.put("status", "PREPARED");
prepareData.put("timestamp", LocalDateTime.now().toString());
// 发送到准备队列
redisTemplate.opsForStream().add("message:prepare", prepareData);
// 模拟业务处理
Thread.sleep(100);
// 第二阶段:提交阶段
log.info("两阶段提交 - 提交阶段: messageId={}", messageId);
// 发送到正式队列
Map<String, String> commitData = convertMessageToMap(message);
commitData.put("status", "COMMITTED");
redisTemplate.opsForStream().add(redisStreamProperties.getKey(), commitData);
log.info("两阶段提交事务消息发送成功: messageId={}", messageId);
return messageId;
} catch (Exception e) {
log.error("两阶段提交事务消息发送失败: messageId={}, error={}", messageId, e.getMessage(), e);
throw new RuntimeException("两阶段提交事务消息发送失败", e);
}
}
```
#### 2.3 原子性保证分析
**MULTI/EXEC模式**
- ✅ 保证Redis操作的原子性
- ❌ 无法跨数据库事务
- 适用场景Redis内部操作需要原子性
**Lua脚本模式**
- ✅ 保证Redis操作的原子性
- ✅ 支持复杂业务逻辑
- ✅ 性能高,减少网络往返
- ❌ 无法跨数据库事务
**消息表模式**
- ✅ 保证业务数据与消息记录的原子性
- ✅ 支持消息重试和死信队列
- ✅ 最强的一致性保证
- ✅ 异步发送,提高性能
**两阶段提交模式**
- ✅ 支持分布式事务
- ✅ 支持回滚机制
- ❌ 复杂度高,性能较低
#### 2.4 结论
- **项目现状**已实现完整的事务消息机制支持4种事务模式
- **Redis MULTI/EXEC**可以保证Redis操作的原子性但无法跨数据库事务
- **Lua脚本**可以实现Redis内部的原子性操作支持复杂逻辑
- **消息表模式**:推荐使用,保证业务数据与消息的强一致性
- **两阶段提交**:适用于分布式事务场景
### 3. 消息堆积上限与Redis内存配置关系
#### 3.1 Redis内存配置分析
`application.yml`配置可以看出:
```yaml
# application.yml 第8-19行
spring:
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 2000ms
lettuce:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
```
#### 3.2 内存限制机制
Redis Stream的内存使用受以下因素影响
1. **maxmemory配置**Redis最大内存限制
2. **Stream长度**:消息数量
3. **消息大小**:单个消息的字节数
4. **消费者组数量**:每个消费者组维护独立的状态
#### 3.3 消息堆积计算
```java
// 建议的内存监控实现
@Service
public class StreamMemoryMonitor {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public Map<String, Object> getStreamMemoryInfo() {
Map<String, Object> info = new HashMap<>();
// 1. 获取Stream长度
Long streamLength = redisTemplate.opsForStream().size("message-stream");
info.put("streamLength", streamLength);
// 2. 获取Redis内存使用情况
Properties memoryInfo = redisTemplate.getConnectionFactory()
.getConnection().info("memory");
info.put("usedMemory", memoryInfo.getProperty("used_memory"));
info.put("maxMemory", memoryInfo.getProperty("maxmemory"));
// 3. 计算消息平均大小
if (streamLength > 0) {
Long memoryUsage = Long.parseLong(memoryInfo.getProperty("used_memory"));
Double avgMessageSize = (double) memoryUsage / streamLength;
info.put("avgMessageSize", avgMessageSize);
}
return info;
}
}
```
#### 3.4 内存优化策略
1. **设置maxmemory**防止Redis内存溢出
2. **配置淘汰策略**LRU、LFU等
3. **监控Stream长度**:设置告警阈值
4. **消息TTL**:设置消息过期时间
#### 3.5 结论
- **消息堆积上限**受Redis maxmemory配置限制
- **建议配置**设置合理的maxmemory和淘汰策略
- **监控告警**实时监控Stream长度和内存使用率
## 技术架构分析
### 1. 消费者模式对比
| 特性 | StreamListener模式 | Manual Ack模式 |
|------|-------------------|----------------|
| 实时性 | 高(事件驱动) | 中(轮询) |
| 控制精度 | 低自动ACK | 高(手动控制) |
| 性能 | 高 | 中 |
| 复杂度 | 低 | 高 |
| 适用场景 | 高并发实时处理 | 精确控制处理流程 |
### 2. 关键代码实现
#### 2.1 StreamListener模式
```java
// StreamListenerConsumerService.java 第225-263行
@Override
public void onMessage(MapRecord<String, String, String> message) {
String recordId = null;
try {
messageCount.incrementAndGet();
// 获取消息数据
Map<String, String> messageData = message.getValue();
recordId = message.getId().getValue();
// 处理消息
processMessage(messageData);
// 确认消息ACK
acknowledgeMessage(recordId);
processedCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
// 处理失败时也要确认消息,避免重复处理
if (recordId != null) {
acknowledgeMessage(recordId);
}
}
}
```
#### 2.2 Manual Ack模式
```java
// ManualAckConsumerService.java 第175-226行
public void pollAndProcessMessages() {
if (isProcessing) {
log.warn("⚠️ Manual Ack 正在处理中,跳过本次拉取");
return;
}
isProcessing = true;
try {
// 确保消费者组存在
ensureConsumerGroupExists();
// 从 Stream 中读取消息
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
.read(Consumer.from(redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName()),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.lastConsumed()));
for (MapRecord<String, Object, Object> message : messages) {
try {
processMessage(message);
processedCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
}
}
} finally {
isProcessing = false;
}
}
```
### 3. 配置管理
#### 3.1 属性配置
```java
// RedisStreamProperties.java 第13-67行
@Data
@Configuration
@ConfigurationProperties(prefix = "redis.stream")
public class RedisStreamProperties {
private String key = "message-stream";
private String consumerGroup = "message-consumer-group";
private String consumerName = "message-consumer";
private StreamListenerConfig streamListener = new StreamListenerConfig();
@Data
public static class StreamListenerConfig {
private boolean autoStart = false;
private boolean processHistoricalMessages = true;
private int pollTimeout = 1;
private int corePoolSize = 2;
private int maxPoolSize = 4;
private int keepAliveTime = 60;
}
}
```
#### 3.2 线程池配置
```java
// RedisStreamConfig.java 第64-78行
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
r -> new Thread(r, "redis-stream-consumer-" + System.currentTimeMillis())
);
// 创建监听容器配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.executor(executor)
.build();
return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}
```
## 测试验证
### 1. 集成测试
项目包含完整的集成测试:
```java
// RedisStreamIntegrationTest.java 第85-120行
@Test
void testStreamListenerEndToEnd() throws InterruptedException {
// 启动 StreamListener
container.start();
// 订阅 Stream
Subscription subscription = container.receive(
Consumer.from(redisStreamProperties.getConsumerGroup(), streamListenerConsumerName),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.from(">")),
streamListenerConsumerService
);
// 发送 100 条消息
int sentCount = messageProducerService.sendBatchMessages(100);
assertEquals(100, sentCount);
// 等待消息处理完成
Thread.sleep(20000);
// 验证消息统计
Map<String, Object> stats = streamListenerConsumerService.getMessageStats();
assertTrue((Long) stats.get("totalReceived") >= 100);
assertTrue((Long) stats.get("totalProcessed") >= 100);
assertEquals(0L, stats.get("totalErrors"));
}
```
### 2. 并发测试
```java
// RedisStreamIntegrationTest.java 第164-215行
@Test
void testConcurrentProductionAndConsumption() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
// 启动 StreamListener
container.start();
// 并发发送消息
Thread producerThread = new Thread(() -> {
try {
messageProducerService.sendBatchMessages(50);
} finally {
latch.countDown();
}
});
// 并发处理消息
Thread consumerThread = new Thread(() -> {
try {
manualAckConsumerService.batchProcessMessages(50);
} finally {
latch.countDown();
}
});
producerThread.start();
consumerThread.start();
// 等待完成
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
```
## 总结与建议
### 1. 项目优势
1. **双模式支持**:同时提供`StreamListener``Manual Ack`两种消费模式
2. **事务消息完备**:覆盖`MULTI/EXEC``Lua脚本``消息表``两阶段提交`四种方案
3. **测试充分**:端到端集成测试与并发测试覆盖关键路径
4. **配置灵活**:可调的线程池、拉取策略与消费者组参数
5. **可运维性较好**:基础监控指标与异常处理、重试机制已具雏形
### 2. 改进建议
1. **默认采用消息表模式**:将“消息表”设为默认事务消息落地方案,提供重试、补偿、死信与可观测性;`MULTI/EXEC``Lua`作为轻量场景可选项。
2. **Pending管理与幂等性**:完善`XPENDING`巡检、超时转移与再分配;在业务侧统一幂等键设计,避免重复投递与重复消费副作用。
3. **死信与重试策略参数化**:落库可追踪,失败按阶梯回退(指数退避+抖动达到阈值进入DLQ并提供人工恢复通道。
4. **内存与长度治理**:结合`XTRIM`(基于长度/时间)与消息字段瘦身;对大消息启用压缩或外部存储(仅存指针)。
5. **分片与扩展性**按业务键如订单ID进行多`Stream`分区;消费者组按分区扩展并支持水平扩容与重平衡。
6. **监控可观测性**:补充生产/消费QPS、Lag、Ack时延、P90/P99、PENDING大小、重试/丢弃计数的指标与可视化仪表板。
7. **批量与流水线优化**:生产端`XADD`合并与管线化,消费端批量读取与批量`XACK`;连接池参数基于压测调优。
### 3. 生产环境建议
1. **容量与持久化**:设置合理`maxmemory`与淘汰策略AOF 建议`everysec`并配合`RDB`定期快照校验恢复时间目标RTO/RPO
2. **Stream治理策略**:为关键`Stream`启用`XTRIM`策略与保留窗口;设置长度/Lag/内存使用率告警阈值与自愈流程。
3. **集群与高可用**优先采用Redis Cluster/主从+哨兵或托管服务;为跨可用区部署设计故障演练与自动故障转移。
4. **限流与隔离**:对生产/消费端设置限流与熔断;按业务域拆分实例或命名空间,避免相互影响。
5. **安全与合规**开启密码、TLS与网络访问控制对敏感字段加密/脱敏,审计关键操作。
6. **变更与演练**灰度发布消费者定期演练消费者宕机恢复、PENDING迁移、DLQ回放与回溯恢复流程。
## 附录
### A. 关键配置参数
```yaml
# Redis连接配置
spring:
redis:
host: localhost
port: 6379
timeout: 2000ms
lettuce:
pool:
max-active: 8
max-idle: 8
# Stream配置
redis:
stream:
key: "message-stream"
consumer-group: "message-consumer-group"
consumer-name: "message-consumer"
```
### B. 监控指标
1. **Stream长度**`XLEN message-stream`
2. **消费者组信息**`XINFO GROUPS message-stream`
3. **待处理消息**`XPENDING message-stream message-consumer-group`
4. **Redis内存**`INFO memory`
### C. 常用Redis命令
```bash
# 查看Stream信息
XINFO STREAM message-stream
# 查看消费者组
XINFO GROUPS message-stream
# 查看待处理消息
XPENDING message-stream message-consumer-group
# 手动确认消息
XACK message-stream message-consumer-group message-id
```
---
**报告生成时间**2025年
**项目版本**Spring Boot Redis Stream Demo
**分析范围**:消费者组恢复、事务消息、内存配置