# Spring Boot Redis Stream Demo 这是一个基于 Spring Boot 和 Redis Stream 的端到端消息收发演示项目,实现了生产者→Redis Stream→消费者的完整流程。 ## 功能特性 - ✅ 使用 Redis 5+ 的 Stream 数据结构(非 Pub/Sub) - ✅ 支持两种消费者模式: - **StreamListener 模式**:自动监听,实时处理,事件驱动 - **Manual Ack 模式**:手动拉取,按需处理,精确控制 - ✅ 完整的单元测试和集成测试 - ✅ REST API 接口用于测试 - ✅ 支持批量消息发送(100条消息) - ✅ 消息确认机制(ACK) - ✅ 消费者组支持 - ✅ 错误处理和重试机制 - ✅ 监控和统计功能 - ✅ 灵活的配置管理 ## 技术栈 - **Spring Boot 2.7.18** - **Spring Data Redis** - **Lettuce Redis Client** - **Jackson JSON** - **Lombok** - **Testcontainers** (用于集成测试) - **Redis 7+** ## 项目结构 ``` src/ ├── main/ │ ├── java/com/example/ │ │ ├── config/ # 配置类 │ │ │ ├── RedisStreamConfig.java │ │ │ └── RedisStreamProperties.java │ │ ├── controller/ # REST 控制器 │ │ │ └── StreamController.java │ │ ├── model/ # 数据模型 │ │ │ └── Message.java │ │ ├── service/ # 业务服务 │ │ │ ├── MessageProducerService.java │ │ │ ├── StreamListenerConsumerService.java │ │ │ └── ManualAckConsumerService.java │ │ └── RedisStreamApplication.java │ └── resources/ │ └── application.yml └── test/ └── java/com/example/ ├── RedisStreamIntegrationTest.java └── service/ └── MessageProducerServiceTest.java ``` ## 快速开始 ### 1. 环境要求 - Java 8+ - Maven 3.6+ - Redis 5+ (推荐 Redis 7+) ### 2. 启动 Redis ```bash # 使用 Docker 启动 Redis docker run -d --name redis -p 6379:6379 redis:7-alpine # 或使用本地 Redis redis-server ``` ### 3. 运行应用 ```bash # 编译项目 mvn clean compile # 运行应用 mvn spring-boot:run ``` 应用将在 `http://localhost:8080` 启动。 ### 4. 运行测试 #### 方式一:使用测试脚本(推荐) ```bash # Windows test-simple.bat # Linux/Mac chmod +x test.sh ./test.sh ``` #### 方式二:手动测试 ```bash # 1. 发送100条消息 curl -X POST "http://localhost:8080/api/stream/send-batch?count=100" # 2. 使用Manual Ack消费消息 curl -X POST "http://localhost:8080/api/stream/consume-manual?count=100" # 3. 查看Stream信息 curl http://localhost:8080/api/stream/info ``` #### 方式三:运行单元测试 ```bash # 运行所有测试(需要本地Redis) mvn test # 运行集成测试 mvn test -Dtest=RedisStreamIntegrationTest # 运行单元测试 mvn test -Dtest=MessageProducerServiceTest ``` ## API 接口 ### 发送消息 ```bash # 发送单条消息 curl -X POST http://localhost:8080/api/stream/send \ -H "Content-Type: application/json" \ -d '{ "content": "测试消息", "type": "TEST", "sender": "test-user" }' # 批量发送 100 条消息 curl -X POST "http://localhost:8080/api/stream/send-batch?count=100" ``` ### 消费消息 ```bash # 使用 Manual Ack 模式消费 10 条消息 curl -X POST "http://localhost:8080/api/stream/consume-manual?count=10" # 拉取单条消息(Manual Ack 模式) curl http://localhost:8080/api/stream/poll-single # 处理单条消息(Manual Ack 模式) curl -X POST http://localhost:8080/api/stream/process-single?recordId=1234567890-0 # 控制 StreamListener 状态 curl -X POST http://localhost:8080/api/stream/stream-listener/start # 启动 curl -X POST http://localhost:8080/api/stream/stream-listener/stop # 停止 curl -X POST http://localhost:8080/api/stream/stream-listener/status # 状态 ``` ### 查看信息 ```bash # 获取 Stream 信息 curl http://localhost:8080/api/stream/info # 健康检查 curl http://localhost:8080/api/stream/health ``` ### 管理操作 ```bash # 重置计数器 curl -X POST http://localhost:8080/api/stream/reset # 清空 Stream curl -X POST http://localhost:8080/api/stream/clear ``` ## 两种消费者模式详解 ### 1. StreamListener 模式 **特点:** - 自动监听 Redis Stream,实时接收消息 - 基于事件驱动的消息处理 - 自动确认消息(ACK) - 支持消费者组和负载均衡 - 适合高并发、实时性要求高的场景 **使用方式:** ```java // 自动启动监听(应用启动时) @PostConstruct public void init() { streamListenerConsumerService.startListening(); } // 手动控制监听状态 streamListenerConsumerService.startListening(); // 启动 streamListenerConsumerService.stopListening(); // 停止 boolean isListening = streamListenerConsumerService.isListening(); // 检查状态 ``` **API 接口:** ```bash # 控制 StreamListener 状态 curl -X POST http://localhost:8080/api/stream/stream-listener/start # 启动 curl -X POST http://localhost:8080/api/stream/stream-listener/stop # 停止 curl -X POST http://localhost:8080/api/stream/stream-listener/status # 状态 ``` ### 2. Manual Ack 模式 **特点:** - 手动拉取消息,按需处理 - 完全控制消息确认时机 - 支持批量处理 - 适合需要精确控制处理流程的场景 - 可以实现复杂的重试和错误处理逻辑 **使用方式:** ```java // 拉取单条消息 MapRecord message = manualAckConsumerService.pollSingleMessage(); if (message != null) { boolean success = manualAckConsumerService.processSingleMessage(message); } // 批量处理消息 manualAckConsumerService.batchProcessMessages(10); // 拉取并处理消息 manualAckConsumerService.pollAndProcessMessages(); ``` **API 接口:** ```bash # 拉取单条消息 curl http://localhost:8080/api/stream/poll-single # 处理单条消息 curl -X POST http://localhost:8080/api/stream/process-single?recordId=1234567890-0 # 批量处理消息 curl -X POST http://localhost:8080/api/stream/consume-manual?count=10 ``` ## 核心组件说明 ### 1. MessageProducerService 消息生产者服务,负责向 Redis Stream 发送消息: - `sendMessage(Message message)`: 发送单条消息 - `sendBatchMessages(int count)`: 批量发送消息 - `getStreamLength()`: 获取 Stream 长度 - `clearStream()`: 清空 Stream ### 2. StreamListenerConsumerService 基于 StreamListener 的消费者服务: - 实现 `StreamListener` 接口 - 自动监听 Redis Stream 中的新消息 - 自动处理消息并发送 ACK - 提供消息统计功能 - 支持启动/停止监听控制 - 包含错误处理和重试机制 ### 3. ManualAckConsumerService 基于手动拉取的消费者服务: - `pollAndProcessMessages()`: 手动拉取并处理消息 - `pollSingleMessage()`: 拉取单条消息 - `processSingleMessage()`: 处理单条消息 - `batchProcessMessages(int count)`: 批量处理指定数量的消息 - 手动发送 ACK 确认 - 提供消息统计功能 - 支持处理状态检查 ### 4. RedisStreamConfig Redis Stream 配置类: - 配置 RedisTemplate - 创建 StreamMessageListenerContainer - 管理监听容器的生命周期 ## 测试用例 ### 集成测试 (RedisStreamIntegrationTest) - `testStreamListenerEndToEnd()`: StreamListener 端到端测试 - `testManualAckEndToEnd()`: Manual Ack 端到端测试 - `testSingleMessageProductionAndConsumption()`: 单条消息测试 - `testConcurrentProductionAndConsumption()`: 并发测试 - `testStreamLengthAndClear()`: Stream 管理测试 ### 单元测试 (MessageProducerServiceTest) - `testSendSingleMessage()`: 单条消息发送测试 - `testSendBatchMessages()`: 批量消息发送测试 - `testSendLargeBatchMessages()`: 大批量消息测试 - `testClearStream()`: Stream 清空测试 - `testGetStreamLength()`: Stream 长度测试 ## 配置说明 ### application.yml ```yaml spring: redis: host: localhost port: 6379 database: 0 redis: stream: key: "message-stream" consumer-group: "message-consumer-group" consumer-name: "message-consumer" ``` ## 运行示例 ### 1. 启动应用 ```bash mvn spring-boot:run ``` ### 2. 发送 100 条消息 ```bash curl -X POST "http://localhost:8080/api/stream/send-batch?count=100" ``` ### 3. 查看 Stream 信息 ```bash curl http://localhost:8080/api/stream/info ``` ### 4. 使用 Manual Ack 消费消息 ```bash curl -X POST "http://localhost:8080/api/stream/consume-manual?count=50" ``` ### 5. 运行测试验证 ```bash mvn test -Dtest=RedisStreamIntegrationTest#testStreamListenerEndToEnd ``` ## 注意事项 1. **Redis 版本**: 需要 Redis 5.0+ 支持 Stream 功能 2. **消费者组**: 项目使用消费者组模式,确保消息的可靠传递 3. **ACK 机制**: 两种消费者模式都实现了手动 ACK,确保消息处理完成 4. **并发安全**: 使用 AtomicLong 确保计数器的线程安全 5. **资源管理**: 监听容器会在应用关闭时自动清理资源 ## 扩展建议 1. **消息持久化**: 可以添加消息持久化到数据库 2. **死信队列**: 实现消息处理失败的死信队列机制 3. **监控指标**: 集成 Micrometer 提供监控指标 4. **消息重试**: 实现消息处理失败的重试机制 5. **集群支持**: 支持 Redis 集群模式 ## 许可证 MIT License