Files
zhoujia 057a095596 demo
2025-10-24 15:51:46 +08:00

380 lines
9.3 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spring Boot Redis Stream Demo
这是一个基于 Spring Boot 和 Redis Stream 的端到端消息收发演示项目实现了生产者→Redis Stream→消费者的完整流程。
## 功能特性
- ✅ 使用 Redis 5+ 的 Stream 数据结构(非 Pub/Sub
- ✅ 支持两种消费者模式:
- **StreamListener 模式**:自动监听,实时处理,事件驱动
- **Manual Ack 模式**:手动拉取,按需处理,精确控制
- ✅ 完整的单元测试和集成测试
- ✅ REST API 接口用于测试
- ✅ 支持批量消息发送100条消息
- ✅ 消息确认机制ACK
- ✅ 消费者组支持
- ✅ 错误处理和重试机制
- ✅ 监控和统计功能
- ✅ 灵活的配置管理
## 技术栈
- **Spring Boot 2.7.18**
- **Spring Data Redis**
- **Lettuce Redis Client**
- **Jackson JSON**
- **Lombok**
- **Testcontainers** (用于集成测试)
- **Redis 7+**
## 项目结构
```
src/
├── main/
│ ├── java/com/example/
│ │ ├── config/ # 配置类
│ │ │ ├── RedisStreamConfig.java
│ │ │ └── RedisStreamProperties.java
│ │ ├── controller/ # REST 控制器
│ │ │ └── StreamController.java
│ │ ├── model/ # 数据模型
│ │ │ └── Message.java
│ │ ├── service/ # 业务服务
│ │ │ ├── MessageProducerService.java
│ │ │ ├── StreamListenerConsumerService.java
│ │ │ └── ManualAckConsumerService.java
│ │ └── RedisStreamApplication.java
│ └── resources/
│ └── application.yml
└── test/
└── java/com/example/
├── RedisStreamIntegrationTest.java
└── service/
└── MessageProducerServiceTest.java
```
## 快速开始
### 1. 环境要求
- Java 8+
- Maven 3.6+
- Redis 5+ (推荐 Redis 7+)
### 2. 启动 Redis
```bash
# 使用 Docker 启动 Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 或使用本地 Redis
redis-server
```
### 3. 运行应用
```bash
# 编译项目
mvn clean compile
# 运行应用
mvn spring-boot:run
```
应用将在 `http://localhost:8080` 启动。
### 4. 运行测试
#### 方式一:使用测试脚本(推荐)
```bash
# Windows
test-simple.bat
# Linux/Mac
chmod +x test.sh
./test.sh
```
#### 方式二:手动测试
```bash
# 1. 发送100条消息
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
# 2. 使用Manual Ack消费消息
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=100"
# 3. 查看Stream信息
curl http://localhost:8080/api/stream/info
```
#### 方式三:运行单元测试
```bash
# 运行所有测试需要本地Redis
mvn test
# 运行集成测试
mvn test -Dtest=RedisStreamIntegrationTest
# 运行单元测试
mvn test -Dtest=MessageProducerServiceTest
```
## API 接口
### 发送消息
```bash
# 发送单条消息
curl -X POST http://localhost:8080/api/stream/send \
-H "Content-Type: application/json" \
-d '{
"content": "测试消息",
"type": "TEST",
"sender": "test-user"
}'
# 批量发送 100 条消息
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
```
### 消费消息
```bash
# 使用 Manual Ack 模式消费 10 条消息
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=10"
# 拉取单条消息Manual Ack 模式)
curl http://localhost:8080/api/stream/poll-single
# 处理单条消息Manual Ack 模式)
curl -X POST http://localhost:8080/api/stream/process-single?recordId=1234567890-0
# 控制 StreamListener 状态
curl -X POST http://localhost:8080/api/stream/stream-listener/start # 启动
curl -X POST http://localhost:8080/api/stream/stream-listener/stop # 停止
curl -X POST http://localhost:8080/api/stream/stream-listener/status # 状态
```
### 查看信息
```bash
# 获取 Stream 信息
curl http://localhost:8080/api/stream/info
# 健康检查
curl http://localhost:8080/api/stream/health
```
### 管理操作
```bash
# 重置计数器
curl -X POST http://localhost:8080/api/stream/reset
# 清空 Stream
curl -X POST http://localhost:8080/api/stream/clear
```
## 两种消费者模式详解
### 1. StreamListener 模式
**特点:**
- 自动监听 Redis Stream实时接收消息
- 基于事件驱动的消息处理
- 自动确认消息ACK
- 支持消费者组和负载均衡
- 适合高并发、实时性要求高的场景
**使用方式:**
```java
// 自动启动监听(应用启动时)
@PostConstruct
public void init() {
streamListenerConsumerService.startListening();
}
// 手动控制监听状态
streamListenerConsumerService.startListening(); // 启动
streamListenerConsumerService.stopListening(); // 停止
boolean isListening = streamListenerConsumerService.isListening(); // 检查状态
```
**API 接口:**
```bash
# 控制 StreamListener 状态
curl -X POST http://localhost:8080/api/stream/stream-listener/start # 启动
curl -X POST http://localhost:8080/api/stream/stream-listener/stop # 停止
curl -X POST http://localhost:8080/api/stream/stream-listener/status # 状态
```
### 2. Manual Ack 模式
**特点:**
- 手动拉取消息,按需处理
- 完全控制消息确认时机
- 支持批量处理
- 适合需要精确控制处理流程的场景
- 可以实现复杂的重试和错误处理逻辑
**使用方式:**
```java
// 拉取单条消息
MapRecord<String, Object, Object> message = manualAckConsumerService.pollSingleMessage();
if (message != null) {
boolean success = manualAckConsumerService.processSingleMessage(message);
}
// 批量处理消息
manualAckConsumerService.batchProcessMessages(10);
// 拉取并处理消息
manualAckConsumerService.pollAndProcessMessages();
```
**API 接口:**
```bash
# 拉取单条消息
curl http://localhost:8080/api/stream/poll-single
# 处理单条消息
curl -X POST http://localhost:8080/api/stream/process-single?recordId=1234567890-0
# 批量处理消息
curl -X POST http://localhost:8080/api/stream/consume-manual?count=10
```
## 核心组件说明
### 1. MessageProducerService
消息生产者服务,负责向 Redis Stream 发送消息:
- `sendMessage(Message message)`: 发送单条消息
- `sendBatchMessages(int count)`: 批量发送消息
- `getStreamLength()`: 获取 Stream 长度
- `clearStream()`: 清空 Stream
### 2. StreamListenerConsumerService
基于 StreamListener 的消费者服务:
- 实现 `StreamListener` 接口
- 自动监听 Redis Stream 中的新消息
- 自动处理消息并发送 ACK
- 提供消息统计功能
- 支持启动/停止监听控制
- 包含错误处理和重试机制
### 3. ManualAckConsumerService
基于手动拉取的消费者服务:
- `pollAndProcessMessages()`: 手动拉取并处理消息
- `pollSingleMessage()`: 拉取单条消息
- `processSingleMessage()`: 处理单条消息
- `batchProcessMessages(int count)`: 批量处理指定数量的消息
- 手动发送 ACK 确认
- 提供消息统计功能
- 支持处理状态检查
### 4. RedisStreamConfig
Redis Stream 配置类:
- 配置 RedisTemplate
- 创建 StreamMessageListenerContainer
- 管理监听容器的生命周期
## 测试用例
### 集成测试 (RedisStreamIntegrationTest)
- `testStreamListenerEndToEnd()`: StreamListener 端到端测试
- `testManualAckEndToEnd()`: Manual Ack 端到端测试
- `testSingleMessageProductionAndConsumption()`: 单条消息测试
- `testConcurrentProductionAndConsumption()`: 并发测试
- `testStreamLengthAndClear()`: Stream 管理测试
### 单元测试 (MessageProducerServiceTest)
- `testSendSingleMessage()`: 单条消息发送测试
- `testSendBatchMessages()`: 批量消息发送测试
- `testSendLargeBatchMessages()`: 大批量消息测试
- `testClearStream()`: Stream 清空测试
- `testGetStreamLength()`: Stream 长度测试
## 配置说明
### application.yml
```yaml
spring:
redis:
host: localhost
port: 6379
database: 0
redis:
stream:
key: "message-stream"
consumer-group: "message-consumer-group"
consumer-name: "message-consumer"
```
## 运行示例
### 1. 启动应用
```bash
mvn spring-boot:run
```
### 2. 发送 100 条消息
```bash
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
```
### 3. 查看 Stream 信息
```bash
curl http://localhost:8080/api/stream/info
```
### 4. 使用 Manual Ack 消费消息
```bash
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=50"
```
### 5. 运行测试验证
```bash
mvn test -Dtest=RedisStreamIntegrationTest#testStreamListenerEndToEnd
```
## 注意事项
1. **Redis 版本**: 需要 Redis 5.0+ 支持 Stream 功能
2. **消费者组**: 项目使用消费者组模式,确保消息的可靠传递
3. **ACK 机制**: 两种消费者模式都实现了手动 ACK确保消息处理完成
4. **并发安全**: 使用 AtomicLong 确保计数器的线程安全
5. **资源管理**: 监听容器会在应用关闭时自动清理资源
## 扩展建议
1. **消息持久化**: 可以添加消息持久化到数据库
2. **死信队列**: 实现消息处理失败的死信队列机制
3. **监控指标**: 集成 Micrometer 提供监控指标
4. **消息重试**: 实现消息处理失败的重试机制
5. **集群支持**: 支持 Redis 集群模式
## 许可证
MIT License