638 lines
21 KiB
Markdown
638 lines
21 KiB
Markdown
# Redis Stream 技术分析报告
|
||
|
||
## 项目概述
|
||
|
||
本项目是一个基于Spring Boot的Redis Stream消息队列系统,实现了两种消费者模式:StreamListener模式和Manual Ack模式。项目展示了Redis Stream在消息队列场景下的关键特性,包括消费者组管理、消息确认机制、以及高可用性保证。
|
||
|
||
## 核心问题分析
|
||
|
||
### 1. 消费者组宕机再上线,未ACK消息是否重复/丢失
|
||
|
||
#### 1.1 问题分析
|
||
|
||
**Redis Stream的消费者组机制保证了消息的可靠性:**
|
||
|
||
1. **消息不会丢失**:未ACK的消息会保留在Redis Stream中,直到被明确确认
|
||
2. **消息可能重复**:消费者重启后,会重新处理未ACK的消息
|
||
|
||
#### 1.2 代码实现分析
|
||
|
||
从`StreamListenerConsumerService.java`和`ManualAckConsumerService.java`的实现可以看出:
|
||
|
||
```java
|
||
// StreamListenerConsumerService.java 第268-279行
|
||
private void acknowledgeMessage(String recordId) {
|
||
try {
|
||
redisTemplate.opsForStream().acknowledge(
|
||
redisStreamProperties.getKey(),
|
||
redisStreamProperties.getConsumerGroup(),
|
||
recordId
|
||
);
|
||
log.debug("StreamListener 消息已确认: recordId={}", recordId);
|
||
} catch (Exception e) {
|
||
log.error("StreamListener 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
|
||
}
|
||
}
|
||
```
|
||
|
||
```java
|
||
// ManualAckConsumerService.java 第305-317行
|
||
private void acknowledgeMessage(String recordId) {
|
||
try {
|
||
redisTemplate.opsForStream().acknowledge(
|
||
redisStreamProperties.getKey(),
|
||
redisStreamProperties.getConsumerGroup(),
|
||
recordId
|
||
);
|
||
log.debug("Manual Ack 消息已确认: recordId={}", recordId);
|
||
} catch (Exception e) {
|
||
log.error("Manual Ack 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
|
||
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 1.3 消费者组恢复机制
|
||
|
||
```java
|
||
// ManualAckConsumerService.java 第58-170行
|
||
private void ensureConsumerGroupExists() {
|
||
// 检查消费者组是否存在
|
||
Object groups = redisTemplate.opsForStream().groups(redisStreamProperties.getKey());
|
||
// 如果不存在,创建消费者组
|
||
if (!groupExists) {
|
||
redisTemplate.opsForStream().createGroup(
|
||
redisStreamProperties.getKey(),
|
||
ReadOffset.from("0"),
|
||
redisStreamProperties.getConsumerGroup()
|
||
);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 1.4 结论
|
||
|
||
- **消息不会丢失**:Redis Stream的持久化机制保证消息持久存储
|
||
- **消息可能重复**:消费者重启后会重新处理未ACK的消息
|
||
- **建议**:业务逻辑需要支持幂等性处理
|
||
|
||
### 2. 生产者端事务消息(MULTI/EXEC 或 Lua 脚本)能否保证"业务 DB 提交 + 消息入队"原子性
|
||
|
||
#### 2.1 问题分析
|
||
|
||
**项目已实现完整的事务消息机制**,支持多种事务模式来保证"业务 DB 提交 + 消息入队"的原子性。
|
||
|
||
#### 2.2 已实现的方案
|
||
|
||
##### 方案一:Redis MULTI/EXEC事务
|
||
|
||
```java
|
||
// TransactionalMessageProducerService.java 第42-81行
|
||
public String sendMessageWithMultiExec(Message message) {
|
||
try {
|
||
// 生成消息ID
|
||
String messageId = UUID.randomUUID().toString();
|
||
message.setId(messageId);
|
||
|
||
// 将消息转换为Map
|
||
Map<String, String> messageMap = convertMessageToMap(message);
|
||
|
||
// 使用MULTI/EXEC事务
|
||
Object result = redisTemplate.execute(new SessionCallback<Object>() {
|
||
@Override
|
||
public Object execute(org.springframework.data.redis.core.RedisOperations operations)
|
||
throws org.springframework.dao.DataAccessException {
|
||
operations.multi();
|
||
try {
|
||
// 在事务中执行Redis操作
|
||
operations.opsForStream().add(redisStreamProperties.getKey(), messageMap);
|
||
|
||
// 可以添加其他Redis操作,比如更新计数器、设置过期时间等
|
||
operations.opsForValue().increment("message:count");
|
||
operations.opsForValue().set("message:last:" + messageId, System.currentTimeMillis());
|
||
|
||
// 提交事务
|
||
return operations.exec();
|
||
} catch (Exception e) {
|
||
operations.discard();
|
||
throw e;
|
||
}
|
||
}
|
||
});
|
||
|
||
log.info("MULTI/EXEC事务消息发送成功: messageId={}, result={}", messageId, result);
|
||
return messageId;
|
||
|
||
} catch (Exception e) {
|
||
log.error("MULTI/EXEC事务消息发送失败: {}", e.getMessage(), e);
|
||
throw new RuntimeException("事务消息发送失败", e);
|
||
}
|
||
}
|
||
```
|
||
|
||
##### 方案二:Lua脚本实现
|
||
|
||
```java
|
||
// LuaScriptTransactionalService.java 第40-62行
|
||
private static final String BASIC_SEND_SCRIPT = """
|
||
local streamKey = KEYS[1]
|
||
local messageId = ARGV[1]
|
||
local content = ARGV[2]
|
||
local type = ARGV[3]
|
||
local sender = ARGV[4]
|
||
local timestamp = ARGV[5]
|
||
|
||
-- 发送消息到Stream
|
||
local recordId = redis.call('XADD', streamKey, '*',
|
||
'id', messageId,
|
||
'content', content,
|
||
'type', type,
|
||
'sender', sender,
|
||
'timestamp', timestamp
|
||
)
|
||
|
||
-- 更新统计信息
|
||
redis.call('INCR', 'message:count')
|
||
redis.call('SET', 'message:last:' .. messageId, timestamp)
|
||
|
||
return recordId
|
||
""";
|
||
```
|
||
|
||
##### 方案三:消息表模式(推荐)
|
||
|
||
```java
|
||
// MessageTableTransactionalService.java 第67-95行
|
||
@Transactional
|
||
public String sendTransactionalMessage(Message message, String businessData) {
|
||
String messageId = UUID.randomUUID().toString();
|
||
message.setId(messageId);
|
||
|
||
try {
|
||
// 1. 执行业务操作
|
||
log.info("执行业务操作: messageId={}, businessData={}", messageId, businessData);
|
||
jdbcTemplate.update(
|
||
"INSERT INTO business_data (id, data, status, created_at) VALUES (?, ?, ?, ?)",
|
||
messageId, businessData, "PROCESSING", LocalDateTime.now()
|
||
);
|
||
|
||
// 2. 记录消息到消息表
|
||
log.info("记录消息到消息表: messageId={}", messageId);
|
||
jdbcTemplate.update(
|
||
"INSERT INTO message_table (id, content, type, sender, status, retry_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||
messageId, message.getContent(), message.getType(), message.getSender(),
|
||
MessageStatus.PENDING.name(), 0, LocalDateTime.now(), LocalDateTime.now()
|
||
);
|
||
|
||
log.info("事务消息记录成功: messageId={}", messageId);
|
||
return messageId;
|
||
|
||
} catch (Exception e) {
|
||
log.error("事务消息记录失败: messageId={}, error={}", messageId, e.getMessage(), e);
|
||
throw new RuntimeException("事务消息记录失败", e);
|
||
}
|
||
}
|
||
```
|
||
|
||
##### 方案四:两阶段提交模式
|
||
|
||
```java
|
||
// TransactionalMessageProducerService.java 第210-260行
|
||
public String sendMessageWithTwoPhaseCommit(Message message) {
|
||
String messageId = UUID.randomUUID().toString();
|
||
message.setId(messageId);
|
||
|
||
try {
|
||
// 第一阶段:准备阶段
|
||
log.info("两阶段提交 - 准备阶段: messageId={}", messageId);
|
||
|
||
// 在Redis中创建准备状态的消息
|
||
Map<String, String> prepareData = new HashMap<>();
|
||
prepareData.put("id", messageId);
|
||
prepareData.put("content", message.getContent());
|
||
prepareData.put("type", message.getType());
|
||
prepareData.put("sender", message.getSender());
|
||
prepareData.put("status", "PREPARED");
|
||
prepareData.put("timestamp", LocalDateTime.now().toString());
|
||
|
||
// 发送到准备队列
|
||
redisTemplate.opsForStream().add("message:prepare", prepareData);
|
||
|
||
// 模拟业务处理
|
||
Thread.sleep(100);
|
||
|
||
// 第二阶段:提交阶段
|
||
log.info("两阶段提交 - 提交阶段: messageId={}", messageId);
|
||
|
||
// 发送到正式队列
|
||
Map<String, String> commitData = convertMessageToMap(message);
|
||
commitData.put("status", "COMMITTED");
|
||
redisTemplate.opsForStream().add(redisStreamProperties.getKey(), commitData);
|
||
|
||
log.info("两阶段提交事务消息发送成功: messageId={}", messageId);
|
||
return messageId;
|
||
|
||
} catch (Exception e) {
|
||
log.error("两阶段提交事务消息发送失败: messageId={}, error={}", messageId, e.getMessage(), e);
|
||
throw new RuntimeException("两阶段提交事务消息发送失败", e);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 2.3 原子性保证分析
|
||
|
||
**MULTI/EXEC模式**:
|
||
- ✅ 保证Redis操作的原子性
|
||
- ❌ 无法跨数据库事务
|
||
- 适用场景:Redis内部操作需要原子性
|
||
|
||
**Lua脚本模式**:
|
||
- ✅ 保证Redis操作的原子性
|
||
- ✅ 支持复杂业务逻辑
|
||
- ✅ 性能高,减少网络往返
|
||
- ❌ 无法跨数据库事务
|
||
|
||
**消息表模式**:
|
||
- ✅ 保证业务数据与消息记录的原子性
|
||
- ✅ 支持消息重试和死信队列
|
||
- ✅ 最强的一致性保证
|
||
- ✅ 异步发送,提高性能
|
||
|
||
**两阶段提交模式**:
|
||
- ✅ 支持分布式事务
|
||
- ✅ 支持回滚机制
|
||
- ❌ 复杂度高,性能较低
|
||
|
||
#### 2.4 结论
|
||
|
||
- **项目现状**:已实现完整的事务消息机制,支持4种事务模式
|
||
- **Redis MULTI/EXEC**:可以保证Redis操作的原子性,但无法跨数据库事务
|
||
- **Lua脚本**:可以实现Redis内部的原子性操作,支持复杂逻辑
|
||
- **消息表模式**:推荐使用,保证业务数据与消息的强一致性
|
||
- **两阶段提交**:适用于分布式事务场景
|
||
|
||
### 3. 消息堆积上限与Redis内存配置关系
|
||
|
||
#### 3.1 Redis内存配置分析
|
||
|
||
从`application.yml`配置可以看出:
|
||
|
||
```yaml
|
||
# application.yml 第8-19行
|
||
spring:
|
||
redis:
|
||
host: localhost
|
||
port: 6379
|
||
password:
|
||
database: 0
|
||
timeout: 2000ms
|
||
lettuce:
|
||
pool:
|
||
max-active: 8
|
||
max-wait: -1ms
|
||
max-idle: 8
|
||
min-idle: 0
|
||
```
|
||
|
||
#### 3.2 内存限制机制
|
||
|
||
Redis Stream的内存使用受以下因素影响:
|
||
|
||
1. **maxmemory配置**:Redis最大内存限制
|
||
2. **Stream长度**:消息数量
|
||
3. **消息大小**:单个消息的字节数
|
||
4. **消费者组数量**:每个消费者组维护独立的状态
|
||
|
||
#### 3.3 消息堆积计算
|
||
|
||
```java
|
||
// 建议的内存监控实现
|
||
@Service
|
||
public class StreamMemoryMonitor {
|
||
|
||
@Autowired
|
||
private RedisTemplate<String, Object> redisTemplate;
|
||
|
||
public Map<String, Object> getStreamMemoryInfo() {
|
||
Map<String, Object> info = new HashMap<>();
|
||
|
||
// 1. 获取Stream长度
|
||
Long streamLength = redisTemplate.opsForStream().size("message-stream");
|
||
info.put("streamLength", streamLength);
|
||
|
||
// 2. 获取Redis内存使用情况
|
||
Properties memoryInfo = redisTemplate.getConnectionFactory()
|
||
.getConnection().info("memory");
|
||
info.put("usedMemory", memoryInfo.getProperty("used_memory"));
|
||
info.put("maxMemory", memoryInfo.getProperty("maxmemory"));
|
||
|
||
// 3. 计算消息平均大小
|
||
if (streamLength > 0) {
|
||
Long memoryUsage = Long.parseLong(memoryInfo.getProperty("used_memory"));
|
||
Double avgMessageSize = (double) memoryUsage / streamLength;
|
||
info.put("avgMessageSize", avgMessageSize);
|
||
}
|
||
|
||
return info;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.4 内存优化策略
|
||
|
||
1. **设置maxmemory**:防止Redis内存溢出
|
||
2. **配置淘汰策略**:LRU、LFU等
|
||
3. **监控Stream长度**:设置告警阈值
|
||
4. **消息TTL**:设置消息过期时间
|
||
|
||
#### 3.5 结论
|
||
|
||
- **消息堆积上限**:受Redis maxmemory配置限制
|
||
- **建议配置**:设置合理的maxmemory和淘汰策略
|
||
- **监控告警**:实时监控Stream长度和内存使用率
|
||
|
||
## 技术架构分析
|
||
|
||
### 1. 消费者模式对比
|
||
|
||
| 特性 | StreamListener模式 | Manual Ack模式 |
|
||
|------|-------------------|----------------|
|
||
| 实时性 | 高(事件驱动) | 中(轮询) |
|
||
| 控制精度 | 低(自动ACK) | 高(手动控制) |
|
||
| 性能 | 高 | 中 |
|
||
| 复杂度 | 低 | 高 |
|
||
| 适用场景 | 高并发实时处理 | 精确控制处理流程 |
|
||
|
||
### 2. 关键代码实现
|
||
|
||
#### 2.1 StreamListener模式
|
||
|
||
```java
|
||
// StreamListenerConsumerService.java 第225-263行
|
||
@Override
|
||
public void onMessage(MapRecord<String, String, String> message) {
|
||
String recordId = null;
|
||
try {
|
||
messageCount.incrementAndGet();
|
||
|
||
// 获取消息数据
|
||
Map<String, String> messageData = message.getValue();
|
||
recordId = message.getId().getValue();
|
||
|
||
// 处理消息
|
||
processMessage(messageData);
|
||
|
||
// 确认消息(ACK)
|
||
acknowledgeMessage(recordId);
|
||
|
||
processedCount.incrementAndGet();
|
||
} catch (Exception e) {
|
||
errorCount.incrementAndGet();
|
||
// 处理失败时也要确认消息,避免重复处理
|
||
if (recordId != null) {
|
||
acknowledgeMessage(recordId);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 2.2 Manual Ack模式
|
||
|
||
```java
|
||
// ManualAckConsumerService.java 第175-226行
|
||
public void pollAndProcessMessages() {
|
||
if (isProcessing) {
|
||
log.warn("⚠️ Manual Ack 正在处理中,跳过本次拉取");
|
||
return;
|
||
}
|
||
|
||
isProcessing = true;
|
||
try {
|
||
// 确保消费者组存在
|
||
ensureConsumerGroupExists();
|
||
|
||
// 从 Stream 中读取消息
|
||
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
|
||
.read(Consumer.from(redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName()),
|
||
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.lastConsumed()));
|
||
|
||
for (MapRecord<String, Object, Object> message : messages) {
|
||
try {
|
||
processMessage(message);
|
||
processedCount.incrementAndGet();
|
||
} catch (Exception e) {
|
||
errorCount.incrementAndGet();
|
||
}
|
||
}
|
||
} finally {
|
||
isProcessing = false;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3. 配置管理
|
||
|
||
#### 3.1 属性配置
|
||
|
||
```java
|
||
// RedisStreamProperties.java 第13-67行
|
||
@Data
|
||
@Configuration
|
||
@ConfigurationProperties(prefix = "redis.stream")
|
||
public class RedisStreamProperties {
|
||
private String key = "message-stream";
|
||
private String consumerGroup = "message-consumer-group";
|
||
private String consumerName = "message-consumer";
|
||
private StreamListenerConfig streamListener = new StreamListenerConfig();
|
||
|
||
@Data
|
||
public static class StreamListenerConfig {
|
||
private boolean autoStart = false;
|
||
private boolean processHistoricalMessages = true;
|
||
private int pollTimeout = 1;
|
||
private int corePoolSize = 2;
|
||
private int maxPoolSize = 4;
|
||
private int keepAliveTime = 60;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.2 线程池配置
|
||
|
||
```java
|
||
// RedisStreamConfig.java 第64-78行
|
||
@Bean
|
||
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {
|
||
// 创建线程池
|
||
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||
2, 4, 60, TimeUnit.SECONDS,
|
||
new LinkedBlockingDeque<>(),
|
||
r -> new Thread(r, "redis-stream-consumer-" + System.currentTimeMillis())
|
||
);
|
||
|
||
// 创建监听容器配置
|
||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
|
||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
|
||
.builder()
|
||
.pollTimeout(Duration.ofSeconds(1))
|
||
.executor(executor)
|
||
.build();
|
||
|
||
return StreamMessageListenerContainer.create(redisConnectionFactory, options);
|
||
}
|
||
```
|
||
|
||
## 测试验证
|
||
|
||
### 1. 集成测试
|
||
|
||
项目包含完整的集成测试:
|
||
|
||
```java
|
||
// RedisStreamIntegrationTest.java 第85-120行
|
||
@Test
|
||
void testStreamListenerEndToEnd() throws InterruptedException {
|
||
// 启动 StreamListener
|
||
container.start();
|
||
|
||
// 订阅 Stream
|
||
Subscription subscription = container.receive(
|
||
Consumer.from(redisStreamProperties.getConsumerGroup(), streamListenerConsumerName),
|
||
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.from(">")),
|
||
streamListenerConsumerService
|
||
);
|
||
|
||
// 发送 100 条消息
|
||
int sentCount = messageProducerService.sendBatchMessages(100);
|
||
assertEquals(100, sentCount);
|
||
|
||
// 等待消息处理完成
|
||
Thread.sleep(20000);
|
||
|
||
// 验证消息统计
|
||
Map<String, Object> stats = streamListenerConsumerService.getMessageStats();
|
||
assertTrue((Long) stats.get("totalReceived") >= 100);
|
||
assertTrue((Long) stats.get("totalProcessed") >= 100);
|
||
assertEquals(0L, stats.get("totalErrors"));
|
||
}
|
||
```
|
||
|
||
### 2. 并发测试
|
||
|
||
```java
|
||
// RedisStreamIntegrationTest.java 第164-215行
|
||
@Test
|
||
void testConcurrentProductionAndConsumption() throws InterruptedException {
|
||
CountDownLatch latch = new CountDownLatch(2);
|
||
|
||
// 启动 StreamListener
|
||
container.start();
|
||
|
||
// 并发发送消息
|
||
Thread producerThread = new Thread(() -> {
|
||
try {
|
||
messageProducerService.sendBatchMessages(50);
|
||
} finally {
|
||
latch.countDown();
|
||
}
|
||
});
|
||
|
||
// 并发处理消息
|
||
Thread consumerThread = new Thread(() -> {
|
||
try {
|
||
manualAckConsumerService.batchProcessMessages(50);
|
||
} finally {
|
||
latch.countDown();
|
||
}
|
||
});
|
||
|
||
producerThread.start();
|
||
consumerThread.start();
|
||
|
||
// 等待完成
|
||
assertTrue(latch.await(30, TimeUnit.SECONDS));
|
||
}
|
||
```
|
||
|
||
## 总结与建议
|
||
|
||
### 1. 项目优势
|
||
|
||
1. **双模式支持**:同时提供`StreamListener`与`Manual Ack`两种消费模式
|
||
2. **事务消息完备**:覆盖`MULTI/EXEC`、`Lua脚本`、`消息表`、`两阶段提交`四种方案
|
||
3. **测试充分**:端到端集成测试与并发测试覆盖关键路径
|
||
4. **配置灵活**:可调的线程池、拉取策略与消费者组参数
|
||
5. **可运维性较好**:基础监控指标与异常处理、重试机制已具雏形
|
||
|
||
### 2. 改进建议
|
||
|
||
1. **默认采用消息表模式**:将“消息表”设为默认事务消息落地方案,提供重试、补偿、死信与可观测性;`MULTI/EXEC`与`Lua`作为轻量场景可选项。
|
||
2. **Pending管理与幂等性**:完善`XPENDING`巡检、超时转移与再分配;在业务侧统一幂等键设计,避免重复投递与重复消费副作用。
|
||
3. **死信与重试策略参数化**:落库可追踪,失败按阶梯回退(指数退避+抖动),达到阈值进入DLQ,并提供人工恢复通道。
|
||
4. **内存与长度治理**:结合`XTRIM`(基于长度/时间)与消息字段瘦身;对大消息启用压缩或外部存储(仅存指针)。
|
||
5. **分片与扩展性**:按业务键(如订单ID)进行多`Stream`分区;消费者组按分区扩展并支持水平扩容与重平衡。
|
||
6. **监控可观测性**:补充生产/消费QPS、Lag、Ack时延、P90/P99、PENDING大小、重试/丢弃计数的指标与可视化仪表板。
|
||
7. **批量与流水线优化**:生产端`XADD`合并与管线化,消费端批量读取与批量`XACK`;连接池参数基于压测调优。
|
||
|
||
### 3. 生产环境建议
|
||
|
||
1. **容量与持久化**:设置合理`maxmemory`与淘汰策略;AOF 建议`everysec`并配合`RDB`定期快照,校验恢复时间目标(RTO/RPO)。
|
||
2. **Stream治理策略**:为关键`Stream`启用`XTRIM`策略与保留窗口;设置长度/Lag/内存使用率告警阈值与自愈流程。
|
||
3. **集群与高可用**:优先采用Redis Cluster/主从+哨兵或托管服务;为跨可用区部署设计故障演练与自动故障转移。
|
||
4. **限流与隔离**:对生产/消费端设置限流与熔断;按业务域拆分实例或命名空间,避免相互影响。
|
||
5. **安全与合规**:开启密码、TLS与网络访问控制;对敏感字段加密/脱敏,审计关键操作。
|
||
6. **变更与演练**:灰度发布消费者;定期演练消费者宕机恢复、PENDING迁移、DLQ回放与回溯恢复流程。
|
||
|
||
## 附录
|
||
|
||
### A. 关键配置参数
|
||
|
||
```yaml
|
||
# Redis连接配置
|
||
spring:
|
||
redis:
|
||
host: localhost
|
||
port: 6379
|
||
timeout: 2000ms
|
||
lettuce:
|
||
pool:
|
||
max-active: 8
|
||
max-idle: 8
|
||
|
||
# Stream配置
|
||
redis:
|
||
stream:
|
||
key: "message-stream"
|
||
consumer-group: "message-consumer-group"
|
||
consumer-name: "message-consumer"
|
||
```
|
||
|
||
### B. 监控指标
|
||
|
||
1. **Stream长度**:`XLEN message-stream`
|
||
2. **消费者组信息**:`XINFO GROUPS message-stream`
|
||
3. **待处理消息**:`XPENDING message-stream message-consumer-group`
|
||
4. **Redis内存**:`INFO memory`
|
||
|
||
### C. 常用Redis命令
|
||
|
||
```bash
|
||
# 查看Stream信息
|
||
XINFO STREAM message-stream
|
||
|
||
# 查看消费者组
|
||
XINFO GROUPS message-stream
|
||
|
||
# 查看待处理消息
|
||
XPENDING message-stream message-consumer-group
|
||
|
||
# 手动确认消息
|
||
XACK message-stream message-consumer-group message-id
|
||
```
|
||
|
||
---
|
||
|
||
**报告生成时间**:2025年
|
||
**项目版本**:Spring Boot Redis Stream Demo
|
||
**分析范围**:消费者组恢复、事务消息、内存配置
|