This commit is contained in:
zhoujia
2025-10-24 15:51:46 +08:00
commit 057a095596
28 changed files with 17576 additions and 0 deletions

18
.idea/compiler.xml generated Normal file
View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="spring-boot-starter-data-redis" />
</profile>
</annotationProcessing>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="spring-boot-starter-data-redis" options="-parameters" />
</option>
</component>
</project>

6
.idea/encodings.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
</component>
</project>

20
.idea/jarRepositories.xml generated Normal file
View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
</component>
</project>

14
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_21" project-jdk-name="21" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

379
README.md Normal file
View File

@ -0,0 +1,379 @@
# Spring Boot Redis Stream Demo
这是一个基于 Spring Boot 和 Redis Stream 的端到端消息收发演示项目实现了生产者→Redis Stream→消费者的完整流程。
## 功能特性
- ✅ 使用 Redis 5+ 的 Stream 数据结构(非 Pub/Sub
- ✅ 支持两种消费者模式:
- **StreamListener 模式**:自动监听,实时处理,事件驱动
- **Manual Ack 模式**:手动拉取,按需处理,精确控制
- ✅ 完整的单元测试和集成测试
- ✅ REST API 接口用于测试
- ✅ 支持批量消息发送100条消息
- ✅ 消息确认机制ACK
- ✅ 消费者组支持
- ✅ 错误处理和重试机制
- ✅ 监控和统计功能
- ✅ 灵活的配置管理
## 技术栈
- **Spring Boot 2.7.18**
- **Spring Data Redis**
- **Lettuce Redis Client**
- **Jackson JSON**
- **Lombok**
- **Testcontainers** (用于集成测试)
- **Redis 7+**
## 项目结构
```
src/
├── main/
│ ├── java/com/example/
│ │ ├── config/ # 配置类
│ │ │ ├── RedisStreamConfig.java
│ │ │ └── RedisStreamProperties.java
│ │ ├── controller/ # REST 控制器
│ │ │ └── StreamController.java
│ │ ├── model/ # 数据模型
│ │ │ └── Message.java
│ │ ├── service/ # 业务服务
│ │ │ ├── MessageProducerService.java
│ │ │ ├── StreamListenerConsumerService.java
│ │ │ └── ManualAckConsumerService.java
│ │ └── RedisStreamApplication.java
│ └── resources/
│ └── application.yml
└── test/
└── java/com/example/
├── RedisStreamIntegrationTest.java
└── service/
└── MessageProducerServiceTest.java
```
## 快速开始
### 1. 环境要求
- Java 8+
- Maven 3.6+
- Redis 5+ (推荐 Redis 7+)
### 2. 启动 Redis
```bash
# 使用 Docker 启动 Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 或使用本地 Redis
redis-server
```
### 3. 运行应用
```bash
# 编译项目
mvn clean compile
# 运行应用
mvn spring-boot:run
```
应用将在 `http://localhost:8080` 启动。
### 4. 运行测试
#### 方式一:使用测试脚本(推荐)
```bash
# Windows
test-simple.bat
# Linux/Mac
chmod +x test.sh
./test.sh
```
#### 方式二:手动测试
```bash
# 1. 发送100条消息
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
# 2. 使用Manual Ack消费消息
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=100"
# 3. 查看Stream信息
curl http://localhost:8080/api/stream/info
```
#### 方式三:运行单元测试
```bash
# 运行所有测试需要本地Redis
mvn test
# 运行集成测试
mvn test -Dtest=RedisStreamIntegrationTest
# 运行单元测试
mvn test -Dtest=MessageProducerServiceTest
```
## API 接口
### 发送消息
```bash
# 发送单条消息
curl -X POST http://localhost:8080/api/stream/send \
-H "Content-Type: application/json" \
-d '{
"content": "测试消息",
"type": "TEST",
"sender": "test-user"
}'
# 批量发送 100 条消息
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
```
### 消费消息
```bash
# 使用 Manual Ack 模式消费 10 条消息
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=10"
# 拉取单条消息Manual Ack 模式)
curl http://localhost:8080/api/stream/poll-single
# 处理单条消息Manual Ack 模式)
curl -X POST http://localhost:8080/api/stream/process-single?recordId=1234567890-0
# 控制 StreamListener 状态
curl -X POST http://localhost:8080/api/stream/stream-listener/start # 启动
curl -X POST http://localhost:8080/api/stream/stream-listener/stop # 停止
curl -X POST http://localhost:8080/api/stream/stream-listener/status # 状态
```
### 查看信息
```bash
# 获取 Stream 信息
curl http://localhost:8080/api/stream/info
# 健康检查
curl http://localhost:8080/api/stream/health
```
### 管理操作
```bash
# 重置计数器
curl -X POST http://localhost:8080/api/stream/reset
# 清空 Stream
curl -X POST http://localhost:8080/api/stream/clear
```
## 两种消费者模式详解
### 1. StreamListener 模式
**特点:**
- 自动监听 Redis Stream实时接收消息
- 基于事件驱动的消息处理
- 自动确认消息ACK
- 支持消费者组和负载均衡
- 适合高并发、实时性要求高的场景
**使用方式:**
```java
// 自动启动监听(应用启动时)
@PostConstruct
public void init() {
streamListenerConsumerService.startListening();
}
// 手动控制监听状态
streamListenerConsumerService.startListening(); // 启动
streamListenerConsumerService.stopListening(); // 停止
boolean isListening = streamListenerConsumerService.isListening(); // 检查状态
```
**API 接口:**
```bash
# 控制 StreamListener 状态
curl -X POST http://localhost:8080/api/stream/stream-listener/start # 启动
curl -X POST http://localhost:8080/api/stream/stream-listener/stop # 停止
curl -X POST http://localhost:8080/api/stream/stream-listener/status # 状态
```
### 2. Manual Ack 模式
**特点:**
- 手动拉取消息,按需处理
- 完全控制消息确认时机
- 支持批量处理
- 适合需要精确控制处理流程的场景
- 可以实现复杂的重试和错误处理逻辑
**使用方式:**
```java
// 拉取单条消息
MapRecord<String, Object, Object> message = manualAckConsumerService.pollSingleMessage();
if (message != null) {
boolean success = manualAckConsumerService.processSingleMessage(message);
}
// 批量处理消息
manualAckConsumerService.batchProcessMessages(10);
// 拉取并处理消息
manualAckConsumerService.pollAndProcessMessages();
```
**API 接口:**
```bash
# 拉取单条消息
curl http://localhost:8080/api/stream/poll-single
# 处理单条消息
curl -X POST http://localhost:8080/api/stream/process-single?recordId=1234567890-0
# 批量处理消息
curl -X POST http://localhost:8080/api/stream/consume-manual?count=10
```
## 核心组件说明
### 1. MessageProducerService
消息生产者服务,负责向 Redis Stream 发送消息:
- `sendMessage(Message message)`: 发送单条消息
- `sendBatchMessages(int count)`: 批量发送消息
- `getStreamLength()`: 获取 Stream 长度
- `clearStream()`: 清空 Stream
### 2. StreamListenerConsumerService
基于 StreamListener 的消费者服务:
- 实现 `StreamListener` 接口
- 自动监听 Redis Stream 中的新消息
- 自动处理消息并发送 ACK
- 提供消息统计功能
- 支持启动/停止监听控制
- 包含错误处理和重试机制
### 3. ManualAckConsumerService
基于手动拉取的消费者服务:
- `pollAndProcessMessages()`: 手动拉取并处理消息
- `pollSingleMessage()`: 拉取单条消息
- `processSingleMessage()`: 处理单条消息
- `batchProcessMessages(int count)`: 批量处理指定数量的消息
- 手动发送 ACK 确认
- 提供消息统计功能
- 支持处理状态检查
### 4. RedisStreamConfig
Redis Stream 配置类:
- 配置 RedisTemplate
- 创建 StreamMessageListenerContainer
- 管理监听容器的生命周期
## 测试用例
### 集成测试 (RedisStreamIntegrationTest)
- `testStreamListenerEndToEnd()`: StreamListener 端到端测试
- `testManualAckEndToEnd()`: Manual Ack 端到端测试
- `testSingleMessageProductionAndConsumption()`: 单条消息测试
- `testConcurrentProductionAndConsumption()`: 并发测试
- `testStreamLengthAndClear()`: Stream 管理测试
### 单元测试 (MessageProducerServiceTest)
- `testSendSingleMessage()`: 单条消息发送测试
- `testSendBatchMessages()`: 批量消息发送测试
- `testSendLargeBatchMessages()`: 大批量消息测试
- `testClearStream()`: Stream 清空测试
- `testGetStreamLength()`: Stream 长度测试
## 配置说明
### application.yml
```yaml
spring:
redis:
host: localhost
port: 6379
database: 0
redis:
stream:
key: "message-stream"
consumer-group: "message-consumer-group"
consumer-name: "message-consumer"
```
## 运行示例
### 1. 启动应用
```bash
mvn spring-boot:run
```
### 2. 发送 100 条消息
```bash
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
```
### 3. 查看 Stream 信息
```bash
curl http://localhost:8080/api/stream/info
```
### 4. 使用 Manual Ack 消费消息
```bash
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=50"
```
### 5. 运行测试验证
```bash
mvn test -Dtest=RedisStreamIntegrationTest#testStreamListenerEndToEnd
```
## 注意事项
1. **Redis 版本**: 需要 Redis 5.0+ 支持 Stream 功能
2. **消费者组**: 项目使用消费者组模式,确保消息的可靠传递
3. **ACK 机制**: 两种消费者模式都实现了手动 ACK确保消息处理完成
4. **并发安全**: 使用 AtomicLong 确保计数器的线程安全
5. **资源管理**: 监听容器会在应用关闭时自动清理资源
## 扩展建议
1. **消息持久化**: 可以添加消息持久化到数据库
2. **死信队列**: 实现消息处理失败的死信队列机制
3. **监控指标**: 集成 Micrometer 提供监控指标
4. **消息重试**: 实现消息处理失败的重试机制
5. **集群支持**: 支持 Redis 集群模式
## 许可证
MIT License

14406
app.log Normal file

File diff suppressed because one or more lines are too long

17
check-streamlistener.bat Normal file
View File

@ -0,0 +1,17 @@
@echo off
echo 检查StreamListener状态...
echo 启动应用...
start /B java -jar target\spring-boot-starter-data-redis-1.0.0.jar > app.log 2>&1
echo 等待应用启动...
timeout /t 15 /nobreak > nul
echo 检查StreamListener状态...
powershell -Command "try { $response = Invoke-WebRequest -Uri 'http://localhost:8080/api/stream/info' -Method GET -TimeoutSec 5; $json = $response.Content | ConvertFrom-Json; Write-Host 'StreamListener状态:'; Write-Host 'isListening:' $json.streamListenerStats.isListening; Write-Host 'totalReceived:' $json.streamListenerStats.totalReceived; Write-Host 'totalProcessed:' $json.streamListenerStats.totalProcessed; Write-Host 'totalErrors:' $json.streamListenerStats.totalErrors } catch { Write-Host '应用未启动或无法连接' }"
echo 停止应用...
taskkill /F /IM java.exe > nul 2>&1
echo 检查完成!
pause

107
pom.xml Normal file
View File

@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Spring Boot Redis Stream Demo</name>
<description>Redis Stream producer-consumer demo with Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
<relativePath/>
</parent>
<properties>
<java.version>8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Spring Boot Web (for REST endpoints) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Lettuce Redis Client -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Lombok for reducing boilerplate -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Testcontainers for integration testing -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,15 @@
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Spring Boot Redis Stream 应用启动类
*/
@SpringBootApplication
public class RedisStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RedisStreamApplication.class, args);
}
}

View File

@ -0,0 +1,96 @@
package com.example.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* 消费者模式配置
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "redis.consumer")
public class ConsumerModeConfig {
/**
* 默认消费者模式
* 可选值: stream-listener, manual-ack, both
*/
private String defaultMode = "both";
/**
* 是否启用 StreamListener 模式
*/
private boolean streamListenerEnabled = true;
/**
* 是否启用 Manual Ack 模式
*/
private boolean manualAckEnabled = true;
/**
* StreamListener 配置
*/
private StreamListenerConfig streamListener = new StreamListenerConfig();
/**
* Manual Ack 配置
*/
private ManualAckConfig manualAck = new ManualAckConfig();
@Data
public static class StreamListenerConfig {
/**
* 是否自动启动
*/
private boolean autoStart = true;
/**
* 轮询超时时间(秒)
*/
private int pollTimeout = 1;
/**
* 线程池核心线程数
*/
private int corePoolSize = 2;
/**
* 线程池最大线程数
*/
private int maxPoolSize = 4;
/**
* 线程空闲时间(秒)
*/
private int keepAliveTime = 60;
}
@Data
public static class ManualAckConfig {
/**
* 默认批量大小
*/
private int defaultBatchSize = 10;
/**
* 最大批量大小
*/
private int maxBatchSize = 100;
/**
* 轮询间隔(毫秒)
*/
private long pollInterval = 1000;
/**
* 是否启用并发处理
*/
private boolean concurrentProcessing = false;
/**
* 最大并发数
*/
private int maxConcurrency = 5;
}
}

View File

@ -0,0 +1,137 @@
package com.example.config;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Redis Stream 配置类
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfig {
private final RedisStreamProperties redisStreamProperties;
private final RedisConnectionFactory redisConnectionFactory;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
private Subscription subscription;
/**
* 配置 RedisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
// 设置序列化器
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
/**
* 配置 Stream 消息监听容器
*/
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
r -> new Thread(r, "redis-stream-consumer-" + System.currentTimeMillis())
);
// 创建监听容器配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.executor(executor)
.build();
// 创建监听容器
container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
log.info("Redis Stream 监听容器已创建");
return container;
}
/**
* 初始化 Stream 和启动监听容器
* 注意:移除 @PostConstruct 避免循环依赖,改为手动调用
*/
public void initializeStream() {
try {
// 启动 Stream 监听容器
if (container != null && !container.isRunning()) {
container.start();
log.info("Redis Stream 监听容器已启动");
}
log.info("Redis Stream 初始化完成: key={}", redisStreamProperties.getKey());
} catch (Exception e) {
log.error("初始化 Redis Stream 失败", e);
}
}
/**
* 启动 Stream 监听
*/
public void startStreamListener() {
if (container != null && !container.isRunning()) {
container.start();
log.info("Redis Stream 监听容器已启动");
}
}
/**
* 停止 Stream 监听
*/
public void stopStreamListener() {
if (subscription != null) {
subscription.cancel();
subscription = null;
log.info("Redis Stream 订阅已取消");
}
}
/**
* 应用关闭时清理资源
*/
@PreDestroy
public void destroy() {
stopStreamListener();
if (container != null && container.isRunning()) {
container.stop();
log.info("Redis Stream 监听容器已停止");
}
}
}

View File

@ -0,0 +1,67 @@
package com.example.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* Redis Stream 配置属性
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "redis.stream")
public class RedisStreamProperties {
/**
* Stream 键名
*/
private String key = "message-stream";
/**
* 消费者组名
*/
private String consumerGroup = "message-consumer-group";
/**
* 消费者名称
*/
private String consumerName = "message-consumer";
/**
* StreamListener 配置
*/
private StreamListenerConfig streamListener = new StreamListenerConfig();
@Data
public static class StreamListenerConfig {
/**
* 是否自动启动
*/
private boolean autoStart = false;
/**
* 是否处理历史消息
*/
private boolean processHistoricalMessages = true;
/**
* 轮询超时时间(秒)
*/
private int pollTimeout = 1;
/**
* 线程池核心线程数
*/
private int corePoolSize = 2;
/**
* 线程池最大线程数
*/
private int maxPoolSize = 4;
/**
* 线程空闲时间(秒)
*/
private int keepAliveTime = 60;
}
}

View File

@ -0,0 +1,326 @@
package com.example.controller;
import com.example.model.Message;
import com.example.service.ManualAckConsumerService;
import com.example.service.MessageProducerService;
import com.example.service.StreamListenerConsumerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
/**
* Redis Stream 测试控制器
*/
@Slf4j
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
public class StreamController {
private final MessageProducerService messageProducerService;
private final StreamListenerConsumerService streamListenerConsumerService;
private final ManualAckConsumerService manualAckConsumerService;
/**
* 发送单条消息
*/
@PostMapping("/send")
public Map<String, Object> sendMessage(@RequestBody Message message) {
try {
String recordId = messageProducerService.sendMessage(message);
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("recordId", recordId);
result.put("messageId", message.getId());
result.put("streamLength", messageProducerService.getStreamLength());
return result;
} catch (Exception e) {
log.error("发送消息失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 批量发送消息
*/
@PostMapping("/send-batch")
public Map<String, Object> sendBatchMessages(@RequestParam(defaultValue = "100") int count) {
try {
int sentCount = messageProducerService.sendBatchMessages(count);
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("sentCount", sentCount);
result.put("requestedCount", count);
result.put("streamLength", messageProducerService.getStreamLength());
return result;
} catch (Exception e) {
log.error("批量发送消息失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 使用 Manual Ack 模式处理消息
*/
@PostMapping("/consume-manual")
public Map<String, Object> consumeMessagesManually(@RequestParam(defaultValue = "10") int count) {
try {
manualAckConsumerService.batchProcessMessages(count);
Map<String, Object> stats = manualAckConsumerService.getMessageStats();
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("processedCount", count);
result.put("stats", stats);
return result;
} catch (Exception e) {
log.error("手动消费消息失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 拉取单条消息Manual Ack 模式)
*/
@GetMapping("/poll-single")
public Map<String, Object> pollSingleMessage() {
try {
MapRecord<String, Object, Object> message = manualAckConsumerService.pollSingleMessage();
Map<String, Object> result = new HashMap<>();
if (message != null) {
result.put("success", true);
result.put("hasMessage", true);
result.put("recordId", message.getId().getValue());
result.put("data", message.getValue());
} else {
result.put("success", true);
result.put("hasMessage", false);
result.put("message", "没有新消息");
}
return result;
} catch (Exception e) {
log.error("拉取单条消息失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 处理单条消息Manual Ack 模式)
*/
@PostMapping("/process-single")
public Map<String, Object> processSingleMessage(@RequestParam String recordId) {
try {
// 这里需要根据 recordId 重新获取消息,实际应用中可能需要存储消息
MapRecord<String, Object, Object> message = manualAckConsumerService.pollSingleMessage();
boolean success = manualAckConsumerService.processSingleMessage(message);
Map<String, Object> result = new HashMap<>();
result.put("success", success);
result.put("recordId", recordId);
result.put("processed", success);
return result;
} catch (Exception e) {
log.error("处理单条消息失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 启动 StreamListener 并指定是否处理历史消息
*/
@PostMapping("/stream-listener/start-with-history")
public Map<String, Object> startStreamListenerWithHistory(@RequestParam(defaultValue = "true") boolean processHistorical) {
try {
streamListenerConsumerService.startListening(processHistorical);
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("action", "start-with-history");
result.put("processHistorical", processHistorical);
result.put("isListening", streamListenerConsumerService.isListening());
result.put("message", "StreamListener 已启动,处理历史消息: " + processHistorical);
return result;
} catch (Exception e) {
log.error("启动 StreamListener 失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 控制 StreamListener 监听状态
*/
@PostMapping("/stream-listener/{action}")
public Map<String, Object> controlStreamListener(@PathVariable String action) {
try {
Map<String, Object> result = new HashMap<>();
switch (action.toLowerCase()) {
case "start":
streamListenerConsumerService.startListening();
result.put("success", true);
result.put("action", "start");
result.put("isListening", streamListenerConsumerService.isListening());
result.put("message", "StreamListener 已启动");
break;
case "stop":
streamListenerConsumerService.stopListening();
result.put("success", true);
result.put("action", "stop");
result.put("isListening", streamListenerConsumerService.isListening());
result.put("message", "StreamListener 已停止");
break;
case "status":
result.put("success", true);
result.put("action", "status");
result.put("isListening", streamListenerConsumerService.isListening());
result.put("message", "获取状态成功");
break;
default:
result.put("success", false);
result.put("error", "不支持的操作: " + action);
break;
}
return result;
} catch (Exception e) {
log.error("控制 StreamListener 失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 获取 Stream 信息
*/
@GetMapping("/info")
public Map<String, Object> getStreamInfo() {
Map<String, Object> result = new HashMap<>();
// Stream 长度
result.put("streamLength", messageProducerService.getStreamLength());
// StreamListener 统计
result.put("streamListenerStats", streamListenerConsumerService.getMessageStats());
// Manual Ack 统计
result.put("manualAckStats", manualAckConsumerService.getMessageStats());
return result;
}
/**
* 重置计数器
*/
@PostMapping("/reset")
public Map<String, Object> resetCounters() {
streamListenerConsumerService.resetCounters();
manualAckConsumerService.resetCounters();
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "计数器已重置");
return result;
}
/**
* 清空 Stream
*/
@PostMapping("/clear")
public Map<String, Object> clearStream() {
try {
messageProducerService.clearStream();
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "Stream 已清空");
result.put("streamLength", messageProducerService.getStreamLength());
return result;
} catch (Exception e) {
log.error("清空 Stream 失败", e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("error", e.getMessage());
return result;
}
}
/**
* 健康检查
*/
@GetMapping("/health")
public Map<String, Object> health() {
Map<String, Object> result = new HashMap<>();
result.put("status", "UP");
result.put("timestamp", System.currentTimeMillis());
result.put("streamLength", messageProducerService.getStreamLength());
return result;
}
/**
* 检查 Stream 状态
*/
@GetMapping("/stream-status")
public Map<String, Object> getStreamStatus() {
Map<String, Object> result = new HashMap<>();
result.put("streamInfo", messageProducerService.getStreamLength());
result.put("manualAckStatus", manualAckConsumerService.checkStreamStatus());
result.put("manualAckStats", manualAckConsumerService.getMessageStats());
return result;
}
}

View File

@ -0,0 +1,224 @@
package com.example.example;
import com.example.model.Message;
import com.example.service.ManualAckConsumerService;
import com.example.service.MessageProducerService;
import com.example.service.StreamListenerConsumerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.Map;
/**
* 消费者使用示例
*
* 本示例展示了两种消费者模式的使用方法:
* 1. StreamListener 模式 - 自动监听,实时处理
* 2. Manual Ack 模式 - 手动拉取,按需处理
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ConsumerUsageExample {
private final MessageProducerService messageProducerService;
private final StreamListenerConsumerService streamListenerConsumerService;
private final ManualAckConsumerService manualAckConsumerService;
// 注释掉 @PostConstruct避免项目启动时自动执行示例
// 如果需要运行示例,可以手动调用 demonstrateUsage() 方法
// @PostConstruct
public void demonstrateUsage() {
log.info("=== Redis Stream 消费者使用示例 ===");
// 示例1发送测试消息
sendTestMessages();
// 示例2StreamListener 模式使用
demonstrateStreamListener();
// 示例3Manual Ack 模式使用
demonstrateManualAck();
// 示例4获取统计信息
showStatistics();
}
/**
* 发送测试消息
*/
private void sendTestMessages() {
log.info("--- 发送测试消息 ---");
try {
// 发送单条消息
Message message1 = new Message("Hello StreamListener!", "text", "user1");
String recordId1 = messageProducerService.sendMessage(message1);
log.info("发送消息1: recordId={}", recordId1);
// 发送另一条消息
Message message2 = new Message("Hello Manual Ack!", "text", "user2");
String recordId2 = messageProducerService.sendMessage(message2);
log.info("发送消息2: recordId={}", recordId2);
// 批量发送消息
int sentCount = messageProducerService.sendBatchMessages(5);
log.info("批量发送消息: {} 条", sentCount);
} catch (Exception e) {
log.error("发送测试消息失败", e);
}
}
/**
* StreamListener 模式使用示例
*/
private void demonstrateStreamListener() {
log.info("--- StreamListener 模式使用示例 ---");
try {
// 检查监听状态
boolean isListening = streamListenerConsumerService.isListening();
log.info("StreamListener 监听状态: {}", isListening);
// 如果需要,可以手动启动监听
if (!isListening) {
streamListenerConsumerService.startListening();
log.info("手动启动 StreamListener");
}
// 等待一段时间让消息被处理
Thread.sleep(2000);
// 获取统计信息
Map<String, Object> stats = streamListenerConsumerService.getMessageStats();
log.info("StreamListener 统计信息: {}", stats);
} catch (Exception e) {
log.error("StreamListener 示例失败", e);
}
}
/**
* Manual Ack 模式使用示例
*/
private void demonstrateManualAck() {
log.info("--- Manual Ack 模式使用示例 ---");
try {
// 方式1拉取单条消息
MapRecord<String, Object, Object> singleMessage = manualAckConsumerService.pollSingleMessage();
if (singleMessage != null) {
log.info("拉取到单条消息: recordId={}, data={}",
singleMessage.getId().getValue(), singleMessage.getValue());
// 处理消息
boolean success = manualAckConsumerService.processSingleMessage(singleMessage);
log.info("处理单条消息结果: {}", success);
} else {
log.info("没有单条消息可拉取");
}
// 方式2批量处理消息
log.info("开始批量处理消息...");
manualAckConsumerService.batchProcessMessages(3);
// 获取统计信息
Map<String, Object> stats = manualAckConsumerService.getMessageStats();
log.info("Manual Ack 统计信息: {}", stats);
} catch (Exception e) {
log.error("Manual Ack 示例失败", e);
}
}
/**
* 显示统计信息
*/
private void showStatistics() {
log.info("--- 统计信息 ---");
try {
// StreamListener 统计
Map<String, Object> streamListenerStats = streamListenerConsumerService.getMessageStats();
log.info("StreamListener 统计: {}", streamListenerStats);
// Manual Ack 统计
Map<String, Object> manualAckStats = manualAckConsumerService.getMessageStats();
log.info("Manual Ack 统计: {}", manualAckStats);
// Stream 长度
long streamLength = messageProducerService.getStreamLength();
log.info("Stream 长度: {}", streamLength);
} catch (Exception e) {
log.error("获取统计信息失败", e);
}
}
/**
* 演示错误处理
*/
public void demonstrateErrorHandling() {
log.info("--- 错误处理示例 ---");
try {
// 发送一条可能导致处理失败的消息
Message errorMessage = new Message("", "error", "system");
messageProducerService.sendMessage(errorMessage);
// 使用 Manual Ack 模式处理,观察错误处理
manualAckConsumerService.pollAndProcessMessages();
// 查看错误统计
Map<String, Object> stats = manualAckConsumerService.getMessageStats();
log.info("错误处理后的统计: {}", stats);
} catch (Exception e) {
log.error("错误处理示例失败", e);
}
}
/**
* 演示性能测试
*/
public void demonstratePerformanceTest() {
log.info("--- 性能测试示例 ---");
try {
// 重置计数器
streamListenerConsumerService.resetCounters();
manualAckConsumerService.resetCounters();
// 发送大量消息
int messageCount = 100;
long startTime = System.currentTimeMillis();
messageProducerService.sendBatchMessages(messageCount);
long sendTime = System.currentTimeMillis() - startTime;
log.info("发送 {} 条消息耗时: {} ms", messageCount, sendTime);
// 等待 StreamListener 处理
Thread.sleep(3000);
// 使用 Manual Ack 处理剩余消息
manualAckConsumerService.batchProcessMessages(50);
// 显示性能统计
Map<String, Object> streamListenerStats = streamListenerConsumerService.getMessageStats();
Map<String, Object> manualAckStats = manualAckConsumerService.getMessageStats();
log.info("性能测试结果:");
log.info("StreamListener: {}", streamListenerStats);
log.info("Manual Ack: {}", manualAckStats);
} catch (Exception e) {
log.error("性能测试失败", e);
}
}
}

View File

@ -0,0 +1,48 @@
package com.example.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 消息模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
/**
* 消息ID
*/
private String id;
/**
* 消息内容
*/
private String content;
/**
* 消息类型
*/
private String type;
/**
* 创建时间
*/
private LocalDateTime timestamp;
/**
* 发送者
*/
private String sender;
public Message(String content, String type, String sender) {
this.content = content;
this.type = type;
this.sender = sender;
this.timestamp = LocalDateTime.now();
}
}

View File

@ -0,0 +1,525 @@
package com.example.service;
import com.example.config.RedisStreamProperties;
import com.example.model.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Manual Ack 模式的消费者服务
*
* 特点:
* 1. 手动拉取消息,按需处理
* 2. 完全控制消息确认时机
* 3. 支持批量处理
* 4. 适合需要精确控制处理流程的场景
* 5. 可以实现复杂的重试和错误处理逻辑
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ManualAckConsumerService {
private final RedisTemplate<String, Object> redisTemplate;
private final RedisStreamProperties redisStreamProperties;
// 消息计数器
private final AtomicLong messageCount = new AtomicLong(0);
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong retryCount = new AtomicLong(0);
private final AtomicLong pendingCount = new AtomicLong(0);
// 处理状态
private volatile boolean isProcessing = false;
@PostConstruct
public void init() {
log.info("Manual Ack 消费者服务已初始化");
}
/**
* 确保消费者组存在(带重试机制)
*/
private void ensureConsumerGroupExists() {
int maxRetries = 3;
int retryCount = 0;
log.info("=== 开始检查消费者组是否存在: {} ===", redisStreamProperties.getConsumerGroup());
while (retryCount < maxRetries) {
try {
// 检查消费者组是否存在
log.info("第{}次检查消费者组: {}", retryCount + 1, redisStreamProperties.getConsumerGroup());
Object groups = redisTemplate.opsForStream().groups(redisStreamProperties.getKey());
log.info("📊 消费者组查询结果: {}", groups);
// 检查返回的组信息是否为空
String groupsStr = groups.toString();
if (groups == null || groupsStr.equals("XInfoGroups[]") || groupsStr.contains("[]") || groupsStr.trim().isEmpty()) {
log.warn("❌ 消费者组不存在,返回空数组: {}", groups);
throw new RuntimeException("消费者组不存在");
}
log.info("✅ 消费者组已存在: {},组信息: {}", redisStreamProperties.getConsumerGroup(), groups);
return; // 消费者组存在,直接返回
} catch (Exception e) {
log.warn("❌ 检查消费者组时出现异常: {}", e.getMessage());
if (e.getMessage().contains("NOGROUP") || e.getMessage().contains("no such key") || e.getMessage().contains("消费者组不存在")) {
log.info("🔍 消费者组不存在,尝试创建: {} (重试 {}/{})",
redisStreamProperties.getConsumerGroup(), retryCount + 1, maxRetries);
try {
// 确保 Stream 存在且有消息
log.info("📊 检查Stream状态: {}", redisStreamProperties.getKey());
Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey());
log.info("📊 Stream长度: {}", streamLength);
if (streamLength == null || streamLength == 0) {
log.warn("⚠️ Stream 不存在或为空,创建初始消息");
Map<String, Object> initData = new HashMap<>();
initData.put("init", "true");
initData.put("timestamp", System.currentTimeMillis());
RecordId recordId = redisTemplate.opsForStream().add(redisStreamProperties.getKey(), initData);
log.info("✅ 初始消息已创建Record ID: {}", recordId.getValue());
// 等待一下确保消息被写入
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("等待消息写入时被中断", ie);
}
// 再次检查Stream长度
streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey());
log.info("📊 创建初始消息后Stream长度: {}", streamLength);
} else {
log.info("✅ Stream存在且有消息长度: {}", streamLength);
}
// 创建消费者组
log.info("🔧 开始创建消费者组: {} 在Stream: {}",
redisStreamProperties.getConsumerGroup(), redisStreamProperties.getKey());
redisTemplate.opsForStream().createGroup(
redisStreamProperties.getKey(),
ReadOffset.from("0"),
redisStreamProperties.getConsumerGroup()
);
log.info("✅ 消费者组创建成功: {}", redisStreamProperties.getConsumerGroup());
// 验证消费者组是否真的创建成功
log.info("🔍 验证消费者组创建结果...");
try {
redisTemplate.opsForStream().groups(redisStreamProperties.getKey());
log.info("✅ 消费者组验证成功: {}", redisStreamProperties.getConsumerGroup());
} catch (Exception verifyException) {
log.error("❌ 消费者组验证失败: {}", verifyException.getMessage());
throw verifyException;
}
log.info("🎉 消费者组创建和验证完成!");
return; // 创建成功,退出重试循环
} catch (Exception createException) {
log.error("❌ 创建消费者组时出现异常: {}", createException.getMessage(), createException);
if (createException.getMessage().contains("BUSYGROUP")) {
log.info("✅ 消费者组已存在: {}", redisStreamProperties.getConsumerGroup());
return; // 消费者组已存在,退出重试循环
}
retryCount++;
if (retryCount >= maxRetries) {
log.error("❌ 创建消费者组失败,已达到最大重试次数: {}", createException.getMessage());
throw createException;
} else {
log.warn("🔄 创建消费者组失败,准备重试: {} (重试 {}/{})",
createException.getMessage(), retryCount, maxRetries);
try {
Thread.sleep(1000 * retryCount); // 递增延迟
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("创建消费者组时被中断", ie);
}
}
}
} else {
log.error("❌ 检查消费者组时出现异常: {}", e.getMessage(), e);
throw e;
}
}
}
log.error("❌ 消费者组创建失败,已达到最大重试次数");
}
/**
* 手动拉取并处理消息
*/
public void pollAndProcessMessages() {
if (isProcessing) {
log.warn("⚠️ Manual Ack 正在处理中,跳过本次拉取");
return;
}
log.info("🚀 开始手动拉取消息");
isProcessing = true;
try {
// 确保消费者组存在
log.info("🔍 确保消费者组存在...");
ensureConsumerGroupExists();
log.info("✅ 消费者组检查完成");
// 从 Stream 中读取消息 - 使用 lastConsumed 读取未确认的消息
log.info("📖 开始从Stream读取消息: {} 消费者组: {} 消费者: {}",
redisStreamProperties.getKey(), redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName());
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
.read(Consumer.from(redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName()),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.lastConsumed()));
log.info("📊 从Stream读取到 {} 条消息", messages.size());
if (messages.isEmpty()) {
log.info(" Manual Ack 没有新消息");
return;
}
log.info("📦 Manual Ack 拉取到 {} 条消息", messages.size());
pendingCount.addAndGet(messages.size());
for (MapRecord<String, Object, Object> message : messages) {
try {
log.debug("🔄 处理消息: {}", message.getId());
processMessage(message);
processedCount.incrementAndGet();
log.debug("✅ 消息处理成功: {}", message.getId());
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("❌ 处理消息失败: {}", e.getMessage(), e);
}
}
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("❌ Manual Ack 拉取消息失败: {}", e.getMessage(), e);
} finally {
isProcessing = false;
log.info("🏁 Manual Ack 拉取消息完成");
}
}
/**
* 拉取单条消息
*/
public MapRecord<String, Object, Object> pollSingleMessage() {
try {
// 确保消费者组存在
ensureConsumerGroupExists();
// 使用 ReadOffset.from(">") 读取新消息
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
.read(Consumer.from(redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName()),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.from(">")));
if (messages.isEmpty()) {
log.debug("Manual Ack 没有新消息");
return null;
}
log.info("Manual Ack 拉取到 1 条新消息");
pendingCount.addAndGet(1); // 增加待处理计数
return messages.get(0);
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("Manual Ack 拉取单条消息失败: {}", e.getMessage(), e);
return null;
}
}
/**
* 处理单条消息
*/
private void processMessage(MapRecord<String, Object, Object> message) {
String recordId = null;
try {
messageCount.incrementAndGet();
// 获取消息数据
Map<String, Object> messageData = new HashMap<>();
Map<Object, Object> rawData = message.getValue();
log.debug("原始消息数据: {}", rawData);
for (Map.Entry<Object, Object> entry : rawData.entrySet()) {
String key = entry.getKey() != null ? entry.getKey().toString() : "null";
Object value = entry.getValue();
messageData.put(key, value);
log.debug("转换字段: {} -> {}", key, value);
}
recordId = message.getId().getValue();
log.debug("消息ID: {}", recordId);
log.info("Manual Ack 收到消息: recordId={}, data={}", recordId, messageData);
// 处理消息
processMessageData(messageData);
// 确认消息ACK
acknowledgeMessage(recordId);
processedCount.incrementAndGet();
pendingCount.decrementAndGet();
log.info("Manual Ack 消息处理完成: recordId={}", recordId);
} catch (Exception e) {
errorCount.incrementAndGet();
pendingCount.decrementAndGet();
log.error("Manual Ack 处理消息失败: recordId={}, error={}, messageData={}",
recordId, e.getMessage(), message.getValue(), e);
// 处理失败时的逻辑
handleProcessingError(message, e);
}
}
/**
* 确认消息
*/
private void acknowledgeMessage(String recordId) {
try {
redisTemplate.opsForStream().acknowledge(
redisStreamProperties.getKey(),
redisStreamProperties.getConsumerGroup(),
recordId
);
log.debug("Manual Ack 消息已确认: recordId={}", recordId);
} catch (Exception e) {
log.error("Manual Ack 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
}
}
/**
* 处理消息处理错误
*/
private void handleProcessingError(MapRecord<String, Object, Object> message, Exception e) {
try {
retryCount.incrementAndGet();
// 示例:记录到错误日志或发送到死信队列
log.error("Manual Ack 消息处理失败,已记录: recordId={}, error={}",
message.getId().getValue(), e.getMessage());
// 可以选择不确认消息,让消息重新被消费
// 或者确认消息并发送到死信队列
} catch (Exception ex) {
log.error("Manual Ack 处理错误失败: {}", ex.getMessage(), ex);
}
}
/**
* 处理消息业务逻辑
*/
private void processMessageData(Map<String, Object> messageData) {
try {
// 模拟业务处理时间
Thread.sleep(10);
// 构建消息对象
Message message = new Message();
message.setId((String) messageData.get("id"));
message.setContent((String) messageData.get("content"));
message.setType((String) messageData.get("type"));
message.setSender((String) messageData.get("sender"));
// 安全解析时间戳
String timestampStr = (String) messageData.get("timestamp");
if (timestampStr != null && !timestampStr.isEmpty()) {
try {
message.setTimestamp(LocalDateTime.parse(timestampStr));
} catch (Exception e) {
log.warn("时间戳解析失败,使用当前时间: timestamp={}, error={}", timestampStr, e.getMessage());
message.setTimestamp(LocalDateTime.now());
}
} else {
log.warn("时间戳为空,使用当前时间");
message.setTimestamp(LocalDateTime.now());
}
log.debug("Manual Ack 处理消息: id={}, content={}, type={}, sender={}",
message.getId(), message.getContent(), message.getType(), message.getSender());
} catch (Exception e) {
log.error("Manual Ack 构建消息对象失败: {}", e.getMessage(), e);
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 批量处理消息
*/
public void batchProcessMessages(int batchSize) {
if (isProcessing) {
log.warn("⚠️ Manual Ack 正在处理中,跳过本次批量处理");
return;
}
log.info("🚀 开始批量处理消息,目标数量: {}", batchSize);
isProcessing = true;
int processed = 0;
try {
// 确保消费者组存在
log.info("🔍 确保消费者组存在...");
ensureConsumerGroupExists();
log.info("✅ 消费者组检查完成");
while (processed < batchSize) {
// 从 Stream 中读取消息 - 使用 lastConsumed 读取未确认的消息
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
.read(Consumer.from(redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName()),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.lastConsumed()));
if (messages.isEmpty()) {
log.info("Manual Ack 批量处理完成,没有更多消息。已处理: {}, 目标: {}", processed, batchSize);
break;
}
log.info("Manual Ack 批量拉取到 {} 条消息,当前已处理: {}", messages.size(), processed);
pendingCount.addAndGet(messages.size());
for (MapRecord<String, Object, Object> message : messages) {
if (processed >= batchSize) {
log.info("已达到批量处理目标数量: {}", batchSize);
break;
}
log.debug("开始处理第 {} 条消息", processed + 1);
processMessage(message);
processed++;
log.debug("完成处理第 {} 条消息,当前统计: received={}, processed={}, errors={}",
processed, messageCount.get(), processedCount.get(), errorCount.get());
}
}
log.info("Manual Ack 批量处理完成: 处理了 {} 条消息", processed);
} catch (Exception e) {
log.error("Manual Ack 批量处理消息失败: {}", e.getMessage(), e);
} finally {
isProcessing = false;
}
}
/**
* 处理单条消息(外部调用)
*/
public boolean processSingleMessage(MapRecord<String, Object, Object> message) {
if (message == null) {
return false;
}
try {
processMessage(message);
return true;
} catch (Exception e) {
log.error("Manual Ack 处理单条消息失败: {}", e.getMessage(), e);
return false;
}
}
/**
* 检查处理状态
*/
public boolean isProcessing() {
return isProcessing;
}
/**
* 获取消息统计信息
*/
public Map<String, Object> getMessageStats() {
Map<String, Object> stats = new HashMap<>();
stats.put("totalReceived", messageCount.get());
stats.put("totalProcessed", processedCount.get());
stats.put("totalErrors", errorCount.get());
stats.put("totalRetries", retryCount.get());
stats.put("pendingCount", pendingCount.get());
stats.put("isProcessing", isProcessing);
stats.put("successRate", messageCount.get() > 0 ?
String.format("%.2f%%", (double) processedCount.get() / messageCount.get() * 100) : "0.00%");
log.info("Manual Ack 消息统计: {}", stats);
return stats;
}
/**
* 重置计数器
*/
public void resetCounters() {
messageCount.set(0);
processedCount.set(0);
errorCount.set(0);
retryCount.set(0);
pendingCount.set(0);
log.info("Manual Ack 计数器已重置");
}
/**
* 检查 Stream 状态
*/
public Map<String, Object> checkStreamStatus() {
Map<String, Object> status = new HashMap<>();
try {
// 检查 Stream 长度
Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey());
status.put("streamLength", streamLength);
// 检查消费者组信息
try {
Object consumerGroupInfo = redisTemplate.opsForStream().groups(redisStreamProperties.getKey());
status.put("consumerGroups", consumerGroupInfo);
} catch (Exception e) {
status.put("consumerGroupsError", e.getMessage());
}
// 检查待处理消息
try {
Object pendingMessages = redisTemplate.opsForStream().pending(
redisStreamProperties.getKey(),
redisStreamProperties.getConsumerGroup()
);
status.put("pendingMessages", pendingMessages);
} catch (Exception e) {
status.put("pendingMessagesError", e.getMessage());
}
status.put("success", true);
} catch (Exception e) {
status.put("success", false);
status.put("error", e.getMessage());
log.error("检查 Stream 状态失败", e);
}
return status;
}
}

View File

@ -0,0 +1,117 @@
package com.example.service;
import com.example.config.RedisStreamProperties;
import com.example.model.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Redis Stream 消息生产者服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducerService {
private final RedisTemplate<String, Object> redisTemplate;
private final RedisStreamProperties redisStreamProperties;
private final ObjectMapper objectMapper;
/**
* 发送单条消息
*/
public String sendMessage(Message message) {
try {
// 生成消息ID
String messageId = UUID.randomUUID().toString();
message.setId(messageId);
// 将消息转换为 Map
Map<String, String> messageMap = convertMessageToMap(message);
// 发送到 Redis Stream
RecordId recordId = redisTemplate.opsForStream()
.add(redisStreamProperties.getKey(), messageMap);
log.info("消息已发送到 Redis Stream: messageId={}, recordId={}, content={}",
messageId, recordId.getValue(), message.getContent());
return recordId.getValue();
} catch (Exception e) {
log.error("发送消息失败: {}", e.getMessage(), e);
throw new RuntimeException("发送消息失败", e);
}
}
/**
* 批量发送消息
*/
public int sendBatchMessages(int count) {
int successCount = 0;
log.info("开始批量发送 {} 条消息", count);
for (int i = 1; i <= count; i++) {
try {
Message message = new Message(
"测试消息内容 " + i,
"TEST",
"producer-" + System.currentTimeMillis()
);
sendMessage(message);
successCount++;
// 添加小延迟避免过快发送
if (i % 10 == 0) {
Thread.sleep(10);
log.debug("已发送 {} 条消息", i);
}
} catch (Exception e) {
log.error("发送第 {} 条消息失败: {}", i, e.getMessage());
}
}
log.info("批量发送完成: 成功 {} 条,总计 {} 条", successCount, count);
return successCount;
}
/**
* 将 Message 对象转换为 Map
*/
private Map<String, String> convertMessageToMap(Message message) {
Map<String, String> messageMap = new HashMap<>();
messageMap.put("id", message.getId());
messageMap.put("content", message.getContent());
messageMap.put("type", message.getType());
messageMap.put("timestamp", message.getTimestamp().toString());
messageMap.put("sender", message.getSender());
return messageMap;
}
/**
* 获取 Stream 长度
*/
public Long getStreamLength() {
return redisTemplate.opsForStream().size(redisStreamProperties.getKey());
}
/**
* 清空 Stream
*/
public void clearStream() {
redisTemplate.delete(redisStreamProperties.getKey());
log.info("Redis Stream 已清空");
}
}

View File

@ -0,0 +1,426 @@
package com.example.service;
import com.example.config.RedisStreamProperties;
import com.example.model.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* StreamListener 模式的消费者服务
*
* 特点:
* 1. 自动监听 Redis Stream实时接收消息
* 2. 基于事件驱动的消息处理
* 3. 自动确认消息ACK
* 4. 支持消费者组和负载均衡
* 5. 适合高并发、实时性要求高的场景
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class StreamListenerConsumerService implements StreamListener<String, MapRecord<String, String, String>> {
private final RedisTemplate<String, Object> redisTemplate;
private final RedisStreamProperties redisStreamProperties;
private final ObjectMapper objectMapper;
private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;
// 消息计数器
private final AtomicLong messageCount = new AtomicLong(0);
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong retryCount = new AtomicLong(0);
// 订阅状态
private Subscription subscription;
private volatile boolean isListening = false;
@PostConstruct
public void init() {
log.info("StreamListener 消费者服务已初始化");
// 不自动启动监听,需要手动调用 startListening()
}
/**
* 启动 Stream 监听(使用当前配置)
*/
public void startListening() {
startListening(redisStreamProperties.getStreamListener().isProcessHistoricalMessages());
}
/**
* 启动 Stream 监听(指定是否处理历史消息)
*/
public void startListening(boolean processHistoricalMessages) {
if (isListening) {
log.warn("StreamListener 已经在监听中");
return;
}
log.info("🚀 开始启动 StreamListener...");
try {
// 确保消费者组存在
log.info("🔍 检查并创建消费者组...");
ensureConsumerGroupExists();
// 创建消费者 - 使用配置的消费者组
String streamListenerGroup = redisStreamProperties.getConsumerGroup();
String streamListenerConsumer = redisStreamProperties.getConsumerName();
Consumer consumer = Consumer.from(streamListenerGroup, streamListenerConsumer);
// 创建 Stream 偏移量 - 根据参数决定是否处理历史消息
ReadOffset readOffset;
if (processHistoricalMessages) {
readOffset = ReadOffset.from("0"); // 从开头读取所有消息
log.info("StreamListener 将处理历史消息(从开头读取)");
} else {
readOffset = ReadOffset.latest(); // 只读取新消息
log.info("StreamListener 只处理新消息(不读取历史消息)");
}
StreamOffset<String> streamOffset = StreamOffset.create(redisStreamProperties.getKey(), readOffset);
// 检查容器状态
log.info("🔍 检查 StreamMessageListenerContainer 状态: running={}", streamMessageListenerContainer.isRunning());
// 确保容器已启动
if (!streamMessageListenerContainer.isRunning()) {
log.info("🚀 启动 StreamMessageListenerContainer...");
streamMessageListenerContainer.start();
log.info("✅ StreamMessageListenerContainer 已启动");
}
// 订阅 Stream
log.info("📡 开始订阅 Stream...");
subscription = streamMessageListenerContainer.receive(consumer, streamOffset, this);
isListening = true;
log.info("🎉 StreamListener 开始监听: key={}, group={}, consumer={}",
redisStreamProperties.getKey(), redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName());
} catch (Exception e) {
log.error("启动 StreamListener 失败", e);
isListening = false;
}
}
/**
* 确保消费者组存在
*/
private void ensureConsumerGroupExists() {
int maxRetries = 5; // 增加重试次数
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// 检查Redis连接
redisTemplate.getConnectionFactory().getConnection().ping();
// 检查消费者组是否存在
Object groups = redisTemplate.opsForStream().groups(redisStreamProperties.getKey());
boolean groupExists = false;
if (groups != null) {
String groupsStr = groups.toString();
if (!groupsStr.contains("[]") && !groupsStr.trim().isEmpty()) {
groupExists = true;
}
}
if (!groupExists) {
String streamListenerGroup = redisStreamProperties.getConsumerGroup();
log.info("🔧 创建消费者组: {} (重试 {}/{})", streamListenerGroup, retryCount + 1, maxRetries);
// 创建消费者组
redisTemplate.opsForStream().createGroup(redisStreamProperties.getKey(), ReadOffset.from("0"), streamListenerGroup);
log.info("✅ 消费者组创建成功: {}", streamListenerGroup);
return;
} else {
log.info("✅ 消费者组已存在: {}", redisStreamProperties.getConsumerGroup());
return;
}
} catch (Exception e) {
retryCount++;
if (e.getMessage().contains("BUSYGROUP")) {
log.info("消费者组已存在: {}", redisStreamProperties.getConsumerGroup());
return;
} else if (e.getMessage().contains("NOGROUP") || e.getMessage().contains("no such key")) {
log.info("Stream 或消费者组不存在,尝试创建: {} (重试 {}/{})",
redisStreamProperties.getConsumerGroup(), retryCount, maxRetries);
if (retryCount < maxRetries) {
try {
Thread.sleep(500 * retryCount); // 增加等待时间
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
continue;
}
} else if (e.getMessage().contains("Connection closed") || e.getMessage().contains("RedisException")) {
log.warn("Redis连接问题尝试重连: {} (重试 {}/{})", e.getMessage(), retryCount, maxRetries);
if (retryCount < maxRetries) {
try {
Thread.sleep(1000 * retryCount); // 连接问题等待更长时间
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
continue;
}
}
log.error("创建消费者组失败: {} (重试 {}/{})", e.getMessage(), retryCount, maxRetries);
if (retryCount >= maxRetries) {
throw new RuntimeException("创建消费者组失败,已重试 " + maxRetries + "", e);
}
}
}
}
/**
* 停止 Stream 监听
*/
public void stopListening() {
if (!isListening) {
log.warn("StreamListener 未在监听中");
return;
}
try {
if (subscription != null) {
subscription.cancel();
subscription = null;
}
isListening = false;
log.info("StreamListener 已停止监听");
} catch (Exception e) {
log.error("停止 StreamListener 失败", e);
}
}
/**
* 检查监听状态
*/
public boolean isListening() {
return isListening;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
String recordId = null;
try {
messageCount.incrementAndGet();
// 获取消息数据
Map<String, String> messageData = message.getValue();
recordId = message.getId().getValue();
log.info("StreamListener 收到消息: recordId={}, data={}", recordId, messageData);
// 处理消息
processMessage(messageData);
// 确认消息ACK
acknowledgeMessage(recordId);
processedCount.incrementAndGet();
log.info("StreamListener 消息处理完成: recordId={}", recordId);
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("StreamListener 处理消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
// 处理失败时可以选择重试或记录到死信队列
handleProcessingError(message, e);
// 即使处理失败,也要确认消息,避免重复处理
try {
if (recordId != null) {
acknowledgeMessage(recordId);
log.info("StreamListener 失败消息已确认: recordId={}", recordId);
}
} catch (Exception ackException) {
log.error("StreamListener 确认失败消息时出错: recordId={}, error={}", recordId, ackException.getMessage(), ackException);
}
}
}
/**
* 确认消息
*/
private void acknowledgeMessage(String recordId) {
try {
redisTemplate.opsForStream().acknowledge(
redisStreamProperties.getKey(),
redisStreamProperties.getConsumerGroup(),
recordId
);
log.debug("StreamListener 消息已确认: recordId={}", recordId);
} catch (Exception e) {
log.error("StreamListener 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e);
}
}
/**
* 处理消息处理错误
*/
private void handleProcessingError(MapRecord<String, String, String> message, Exception e) {
try {
String recordId = message.getId().getValue();
retryCount.incrementAndGet();
// 记录详细的错误信息
log.error("StreamListener 消息处理失败,已记录: recordId={}, error={}, messageData={}",
recordId, e.getMessage(), message.getValue());
// 记录错误堆栈信息
log.error("StreamListener 错误堆栈:", e);
// 这里可以实现重试逻辑或死信队列
// 当前策略:记录错误并继续处理下一条消息
} catch (Exception ex) {
log.error("StreamListener 处理错误失败: {}", ex.getMessage(), ex);
}
}
/**
* 处理消息业务逻辑
*/
private void processMessage(Map<String, String> messageData) {
try {
log.debug("StreamListener 开始处理消息: {}", messageData);
// 模拟业务处理时间
Thread.sleep(10);
// 构建消息对象
Message message = new Message();
message.setId(messageData.get("id"));
message.setContent(messageData.get("content"));
message.setType(messageData.get("type"));
message.setSender(messageData.get("sender"));
// 安全地解析时间戳
String timestampStr = messageData.get("timestamp");
if (timestampStr != null && !timestampStr.trim().isEmpty()) {
try {
// 处理带引号的时间戳字符串
String cleanTimestamp = timestampStr.trim();
if (cleanTimestamp.startsWith("\"") && cleanTimestamp.endsWith("\"")) {
cleanTimestamp = cleanTimestamp.substring(1, cleanTimestamp.length() - 1);
}
// 尝试解析不同的时间戳格式
LocalDateTime timestamp;
if (cleanTimestamp.contains("T")) {
// ISO格式: 2025-10-22T15:06:49 或 2025-10-22T15:06:49.123
timestamp = LocalDateTime.parse(cleanTimestamp);
} else {
// 其他格式,使用当前时间
timestamp = LocalDateTime.now();
log.warn("StreamListener 不支持的时间戳格式,使用当前时间: {}", cleanTimestamp);
}
message.setTimestamp(timestamp);
} catch (Exception parseException) {
log.warn("StreamListener 解析时间戳失败,使用当前时间: timestampStr={}, error={}",
timestampStr, parseException.getMessage());
message.setTimestamp(LocalDateTime.now());
}
} else {
message.setTimestamp(LocalDateTime.now());
log.warn("StreamListener 消息时间戳为空,使用当前时间: {}", messageData);
}
log.debug("StreamListener 处理消息: id={}, content={}, type={}, sender={}, timestamp={}",
message.getId(), message.getContent(), message.getType(), message.getSender(), message.getTimestamp());
} catch (Exception e) {
log.error("StreamListener 构建消息对象失败: messageData={}, error={}", messageData, e.getMessage(), e);
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 获取消息统计信息
*/
public Map<String, Object> getMessageStats() {
Map<String, Object> stats = new HashMap<>();
long totalReceived = messageCount.get();
long totalProcessed = processedCount.get();
long totalErrors = errorCount.get();
long totalRetries = retryCount.get();
stats.put("totalReceived", totalReceived);
stats.put("totalProcessed", totalProcessed);
stats.put("totalErrors", totalErrors);
stats.put("totalRetries", totalRetries);
stats.put("isListening", isListening);
// 修复成功率计算逻辑
// 成功率 = 成功处理的消息数 / 总接收消息数
// 如果总接收数为0则成功率为0%
String successRate;
if (totalReceived == 0) {
successRate = "0.00%";
} else {
double rate = (double) totalProcessed / totalReceived * 100;
successRate = String.format("%.2f%%", rate);
}
stats.put("successRate", successRate);
log.info("StreamListener 消息统计: totalReceived={}, totalProcessed={}, totalErrors={}, successRate={}",
totalReceived, totalProcessed, totalErrors, successRate);
return stats;
}
/**
* 重置计数器
*/
public void resetCounters() {
messageCount.set(0);
processedCount.set(0);
errorCount.set(0);
retryCount.set(0);
log.info("StreamListener 计数器已重置");
}
/**
* 获取当前计数器状态(用于调试)
*/
public Map<String, Long> getCounterStatus() {
Map<String, Long> counters = new HashMap<>();
counters.put("messageCount", messageCount.get());
counters.put("processedCount", processedCount.get());
counters.put("errorCount", errorCount.get());
counters.put("retryCount", retryCount.get());
return counters;
}
/**
* 应用关闭时清理资源
*/
@PreDestroy
public void destroy() {
stopListening();
log.info("StreamListener 消费者服务已销毁");
}
}

View File

@ -0,0 +1,42 @@
server:
port: 8080
spring:
application:
name: redis-stream-demo
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 10000ms # 增加超时时间到10秒
lettuce:
pool:
max-active: 8
max-wait: 5000ms # 设置合理的等待时间
max-idle: 8
min-idle: 0
# Redis Stream Configuration
redis:
stream:
key: "message-stream"
consumer-group: "message-consumer-group"
consumer-name: "message-consumer"
# 消费者模式配置
consumer:
# 默认模式: stream-listener, manual-ack, both
default-mode: "both"
# 是否启用 StreamListener 模式
stream-listener-enabled: false
# 是否启用 Manual Ack 模式
manual-ack-enabled: false
# Logging Configuration
logging:
level:
com.example: DEBUG
org.springframework.data.redis: DEBUG
io.lettuce: DEBUG

View File

@ -0,0 +1,70 @@
server:
port: 8080
spring:
application:
name: redis-stream-demo
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 2000ms
lettuce:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
# Redis Stream Configuration
redis:
stream:
key: "message-stream"
consumer-group: "message-consumer-group"
consumer-name: "message-consumer"
# 消费者模式配置
consumer:
# 默认模式: stream-listener, manual-ack, both
default-mode: "both"
# 是否启用 StreamListener 模式
stream-listener-enabled: true
# 是否启用 Manual Ack 模式
manual-ack-enabled: true
# StreamListener 配置
stream-listener:
# 是否自动启动
auto-start: false
# 是否处理历史消息true: 从开头读取所有消息false: 只读取新消息)
process-historical-messages: true
# 轮询超时时间(秒)
poll-timeout: 1
# 线程池核心线程数
core-pool-size: 2
# 线程池最大线程数
max-pool-size: 4
# 线程空闲时间(秒)
keep-alive-time: 60
# Manual Ack 配置
manual-ack:
# 默认批量大小
default-batch-size: 10
# 最大批量大小
max-batch-size: 100
# 轮询间隔(毫秒)
poll-interval: 1000
# 是否启用并发处理
concurrent-processing: false
# 最大并发数
max-concurrency: 5
# Logging Configuration
logging:
level:
com.example: DEBUG
org.springframework.data.redis: DEBUG
io.lettuce: DEBUG

View File

@ -0,0 +1,230 @@
package com.example;
import com.example.config.RedisStreamConfig;
import com.example.config.RedisStreamProperties;
import com.example.service.ManualAckConsumerService;
import com.example.service.MessageProducerService;
import com.example.service.StreamListenerConsumerService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.test.context.TestPropertySource;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
/**
* Redis Stream 端到端测试
*/
@Slf4j
@SpringBootTest
@TestPropertySource(properties = {
"spring.redis.host=localhost",
"spring.redis.port=6379"
})
class RedisStreamIntegrationTest {
@Autowired
private MessageProducerService messageProducerService;
@Autowired
private StreamListenerConsumerService streamListenerConsumerService;
@Autowired
private ManualAckConsumerService manualAckConsumerService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisStreamProperties redisStreamProperties;
@Autowired
private StreamMessageListenerContainer<String, org.springframework.data.redis.connection.stream.MapRecord<String, String, String>> container;
@BeforeEach
void setUp() {
try {
// 检查Redis连接
redisTemplate.getConnectionFactory().getConnection().ping();
log.info("Redis连接正常");
} catch (Exception e) {
log.error("Redis连接失败: {}", e.getMessage());
throw new RuntimeException("Redis连接不可用", e);
}
// 清空 Stream
messageProducerService.clearStream();
// 重置计数器
streamListenerConsumerService.resetCounters();
manualAckConsumerService.resetCounters();
// 创建消费者组(使用正确的偏移量)
try {
redisTemplate.opsForStream().createGroup(redisStreamProperties.getKey(), ReadOffset.from("0"), redisStreamProperties.getConsumerGroup());
log.info("消费者组已创建: {}", redisStreamProperties.getConsumerGroup());
} catch (Exception e) {
// 消费者组可能已存在,忽略异常
log.info("消费者组可能已存在: {}", e.getMessage());
}
}
@Test
void testStreamListenerEndToEnd() throws InterruptedException {
// 启动 StreamListener
container.start();
// 订阅 Stream - 使用不同的消费者名称避免冲突
String streamListenerConsumerName = redisStreamProperties.getConsumerName() + "-stream-listener";
Subscription subscription = container.receive(
Consumer.from(redisStreamProperties.getConsumerGroup(), streamListenerConsumerName),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.from(">")),
streamListenerConsumerService
);
// 等待订阅生效
Thread.sleep(3000);
// 发送 100 条消息
int sentCount = messageProducerService.sendBatchMessages(100);
assertEquals(100, sentCount);
// 等待消息处理完成 - 增加等待时间
Thread.sleep(20000);
// 验证消息统计
Map<String, Object> stats = streamListenerConsumerService.getMessageStats();
log.info("StreamListener 统计信息: {}", stats);
assertTrue((Long) stats.get("totalReceived") >= 100, "应该接收到至少 100 条消息");
assertTrue((Long) stats.get("totalProcessed") >= 100, "应该处理至少 100 条消息");
assertEquals(0L, stats.get("totalErrors"), "不应该有处理错误");
// 取消订阅
subscription.cancel();
// 停止容器
container.stop();
}
@Test
void testManualAckEndToEnd() throws InterruptedException {
// 发送 100 条消息
int sentCount = messageProducerService.sendBatchMessages(100);
assertEquals(100, sentCount);
// 等待消息发送完成
Thread.sleep(2000);
// 使用 Manual Ack 模式处理消息
manualAckConsumerService.batchProcessMessages(100);
// 验证消息统计
Map<String, Object> stats = manualAckConsumerService.getMessageStats();
assertTrue((Long) stats.get("totalReceived") >= 100, "应该接收到至少 100 条消息");
assertTrue((Long) stats.get("totalProcessed") >= 100, "应该处理至少 100 条消息");
assertEquals(0L, stats.get("totalErrors"), "不应该有处理错误");
}
@Test
void testSingleMessageProductionAndConsumption() {
// 发送单条消息
String recordId = messageProducerService.sendMessage(
new com.example.model.Message("测试消息", "TEST", "test-sender")
);
assertNotNull(recordId);
// 验证 Stream 长度
Long streamLength = messageProducerService.getStreamLength();
assertEquals(1, streamLength);
// 使用 Manual Ack 处理消息
manualAckConsumerService.pollAndProcessMessages();
// 验证消息统计 - 由于可能被多个消费者处理检查是否至少处理了1条
Map<String, Object> stats = manualAckConsumerService.getMessageStats();
assertTrue((Long) stats.get("totalReceived") >= 1, "应该至少接收到1条消息");
assertTrue((Long) stats.get("totalProcessed") >= 1, "应该至少处理1条消息");
assertEquals(0L, stats.get("totalErrors"), "不应该有处理错误");
}
@Test
void testConcurrentProductionAndConsumption() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
// 启动 StreamListener
container.start();
// 使用不同的消费者名称避免冲突
String streamListenerConsumerName = redisStreamProperties.getConsumerName() + "-concurrent";
Subscription subscription = container.receive(
Consumer.from(redisStreamProperties.getConsumerGroup(), streamListenerConsumerName),
StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.from(">")),
streamListenerConsumerService
);
// 等待订阅生效
Thread.sleep(2000);
// 并发发送消息
Thread producerThread = new Thread(() -> {
try {
messageProducerService.sendBatchMessages(50);
} finally {
latch.countDown();
}
});
// 并发处理消息
Thread consumerThread = new Thread(() -> {
try {
manualAckConsumerService.batchProcessMessages(50);
} finally {
latch.countDown();
}
});
producerThread.start();
consumerThread.start();
// 等待完成
assertTrue(latch.await(30, TimeUnit.SECONDS), "并发测试应该在 30 秒内完成");
// 验证结果
Map<String, Object> streamListenerStats = streamListenerConsumerService.getMessageStats();
Map<String, Object> manualAckStats = manualAckConsumerService.getMessageStats();
long totalProcessed = (Long) streamListenerStats.get("totalProcessed") + (Long) manualAckStats.get("totalProcessed");
assertTrue(totalProcessed >= 50, "总共应该处理至少 50 条消息");
// 清理
subscription.cancel();
container.stop();
}
@Test
void testStreamLengthAndClear() {
// 初始 Stream 长度应该为 0
assertEquals(0, messageProducerService.getStreamLength());
// 发送 10 条消息
messageProducerService.sendBatchMessages(10);
assertEquals(10, messageProducerService.getStreamLength());
// 清空 Stream
messageProducerService.clearStream();
assertEquals(0, messageProducerService.getStreamLength());
}
}

View File

@ -0,0 +1,101 @@
package com.example.service;
import com.example.config.RedisStreamProperties;
import com.example.model.Message;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.TestPropertySource;
import static org.junit.jupiter.api.Assertions.*;
/**
* 消息生产者服务测试
*/
@SpringBootTest
@TestPropertySource(properties = {
"spring.redis.host=localhost",
"spring.redis.port=6379"
})
class MessageProducerServiceTest {
@Autowired
private MessageProducerService messageProducerService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisStreamProperties redisStreamProperties;
@BeforeEach
void setUp() {
messageProducerService.clearStream();
}
@Test
void testSendSingleMessage() {
// 创建测试消息
Message message = new Message("测试消息内容", "TEST", "test-sender");
// 发送消息
String recordId = messageProducerService.sendMessage(message);
// 验证结果
assertNotNull(recordId);
assertNotNull(message.getId());
assertEquals("测试消息内容", message.getContent());
assertEquals("TEST", message.getType());
assertEquals("test-sender", message.getSender());
// 验证 Stream 长度
assertEquals(1, messageProducerService.getStreamLength());
}
@Test
void testSendBatchMessages() {
// 发送 10 条消息
int sentCount = messageProducerService.sendBatchMessages(10);
// 验证结果
assertEquals(10, sentCount);
assertEquals(10, messageProducerService.getStreamLength());
}
@Test
void testSendLargeBatchMessages() {
// 发送 100 条消息
int sentCount = messageProducerService.sendBatchMessages(100);
// 验证结果
assertEquals(100, sentCount);
assertEquals(100, messageProducerService.getStreamLength());
}
@Test
void testClearStream() {
// 发送一些消息
messageProducerService.sendBatchMessages(5);
assertEquals(5, messageProducerService.getStreamLength());
// 清空 Stream
messageProducerService.clearStream();
assertEquals(0, messageProducerService.getStreamLength());
}
@Test
void testGetStreamLength() {
// 初始长度应该为 0
assertEquals(0, messageProducerService.getStreamLength());
// 发送消息后长度应该增加
messageProducerService.sendBatchMessages(3);
assertEquals(3, messageProducerService.getStreamLength());
// 再发送消息
messageProducerService.sendBatchMessages(2);
assertEquals(5, messageProducerService.getStreamLength());
}
}

40
test-simple.bat Normal file
View File

@ -0,0 +1,40 @@
@echo off
echo Starting Redis Stream Demo Test...
echo.
echo 1. Starting Spring Boot Application...
start "Redis Stream App" cmd /k "mvn spring-boot:run"
echo.
echo Waiting for application to start...
timeout /t 10 /nobreak > nul
echo.
echo 2. Testing Manual Ack Consumer with 100 messages...
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
echo.
echo Waiting for messages to be sent...
timeout /t 3 /nobreak > nul
echo.
echo 3. Processing messages with Manual Ack...
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=100"
echo.
echo Waiting for processing...
timeout /t 5 /nobreak > nul
echo.
echo 4. Checking Stream Info...
curl http://localhost:8080/api/stream/info
echo.
echo 5. Testing completed! Check the application logs for details.
echo Press any key to close the application...
pause > nul
echo.
echo Stopping application...
taskkill /f /im java.exe > nul 2>&1
echo Test completed!

42
test-simple.sh Normal file
View File

@ -0,0 +1,42 @@
#!/bin/bash
echo "Starting Redis Stream Demo Test..."
echo ""
echo "1. Starting Spring Boot Application..."
mvn spring-boot:run &
APP_PID=$!
echo ""
echo "Waiting for application to start..."
sleep 10
echo ""
echo "2. Testing Manual Ack Consumer with 100 messages..."
curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
echo ""
echo "Waiting for messages to be sent..."
sleep 3
echo ""
echo "3. Processing messages with Manual Ack..."
curl -X POST "http://localhost:8080/api/stream/consume-manual?count=100"
echo ""
echo "Waiting for processing..."
sleep 5
echo ""
echo "4. Checking Stream Info..."
curl http://localhost:8080/api/stream/info
echo ""
echo "5. Testing completed! Check the application logs for details."
echo "Press any key to close the application..."
read -n 1
echo ""
echo "Stopping application..."
kill $APP_PID
echo "Test completed!"

41
test.bat Normal file
View File

@ -0,0 +1,41 @@
@echo off
echo === Spring Boot Redis Stream Demo 测试脚本 ===
echo 1. 检查项目结构...
if not exist "pom.xml" (
echo ❌ 未找到 pom.xml 文件
exit /b 1
)
if not exist "src\main\java\com\example\RedisStreamApplication.java" (
echo ❌ 未找到主启动类
exit /b 1
)
echo ✅ 项目结构正常
echo 2. 编译项目...
call mvn clean compile -q
if %errorlevel% neq 0 (
echo ❌ 项目编译失败
exit /b 1
)
echo ✅ 项目编译成功
echo 3. 运行单元测试...
call mvn test -q
if %errorlevel% neq 0 (
echo ❌ 单元测试失败
exit /b 1
)
echo ✅ 单元测试通过
echo.
echo === 测试完成 ===
echo.
echo 接下来可以:
echo 1. 启动应用: mvn spring-boot:run
echo 2. 发送消息: curl -X POST "http://localhost:8080/api/stream/send-batch?count=100"
echo 3. 查看信息: curl http://localhost:8080/api/stream/info
echo 4. 消费消息: curl -X POST "http://localhost:8080/api/stream/consume-manual?count=50"
echo.
echo 详细使用说明请查看 README.md
pause

47
test.sh Normal file
View File

@ -0,0 +1,47 @@
#!/bin/bash
# Redis Stream Demo 测试脚本
echo "=== Spring Boot Redis Stream Demo 测试脚本 ==="
# 检查 Redis 是否运行
echo "1. 检查 Redis 连接..."
redis-cli ping > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "✅ Redis 连接正常"
else
echo "❌ Redis 连接失败,请确保 Redis 服务正在运行"
echo " 启动命令: docker run -d --name redis -p 6379:6379 redis:7-alpine"
exit 1
fi
# 编译项目
echo "2. 编译项目..."
mvn clean compile -q
if [ $? -eq 0 ]; then
echo "✅ 项目编译成功"
else
echo "❌ 项目编译失败"
exit 1
fi
# 运行测试
echo "3. 运行单元测试..."
mvn test -q
if [ $? -eq 0 ]; then
echo "✅ 单元测试通过"
else
echo "❌ 单元测试失败"
exit 1
fi
echo ""
echo "=== 测试完成 ==="
echo ""
echo "接下来可以:"
echo "1. 启动应用: mvn spring-boot:run"
echo "2. 发送消息: curl -X POST 'http://localhost:8080/api/stream/send-batch?count=100'"
echo "3. 查看信息: curl http://localhost:8080/api/stream/info"
echo "4. 消费消息: curl -X POST 'http://localhost:8080/api/stream/consume-manual?count=50'"
echo ""
echo "详细使用说明请查看 README.md"