diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/misc.xml b/.idea/misc.xml index bada8b5..be9407e 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -8,7 +8,7 @@ - + \ No newline at end of file diff --git a/Redis_Stream_技术分析报告.md b/Redis_Stream_技术分析报告.md new file mode 100644 index 0000000..4cc403d --- /dev/null +++ b/Redis_Stream_技术分析报告.md @@ -0,0 +1,637 @@ +# Redis Stream 技术分析报告 + +## 项目概述 + +本项目是一个基于Spring Boot的Redis Stream消息队列系统,实现了两种消费者模式:StreamListener模式和Manual Ack模式。项目展示了Redis Stream在消息队列场景下的关键特性,包括消费者组管理、消息确认机制、以及高可用性保证。 + +## 核心问题分析 + +### 1. 消费者组宕机再上线,未ACK消息是否重复/丢失 + +#### 1.1 问题分析 + +**Redis Stream的消费者组机制保证了消息的可靠性:** + +1. **消息不会丢失**:未ACK的消息会保留在Redis Stream中,直到被明确确认 +2. **消息可能重复**:消费者重启后,会重新处理未ACK的消息 + +#### 1.2 代码实现分析 + +从`StreamListenerConsumerService.java`和`ManualAckConsumerService.java`的实现可以看出: + +```java +// StreamListenerConsumerService.java 第268-279行 +private void acknowledgeMessage(String recordId) { + try { + redisTemplate.opsForStream().acknowledge( + redisStreamProperties.getKey(), + redisStreamProperties.getConsumerGroup(), + recordId + ); + log.debug("StreamListener 消息已确认: recordId={}", recordId); + } catch (Exception e) { + log.error("StreamListener 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e); + } +} +``` + +```java +// ManualAckConsumerService.java 第305-317行 +private void acknowledgeMessage(String recordId) { + try { + redisTemplate.opsForStream().acknowledge( + redisStreamProperties.getKey(), + redisStreamProperties.getConsumerGroup(), + recordId + ); + log.debug("Manual Ack 消息已确认: recordId={}", recordId); + } catch (Exception e) { + log.error("Manual Ack 确认消息失败: recordId={}, error={}", recordId, e.getMessage(), e); + throw new RuntimeException("消息确认失败: " + e.getMessage(), e); + } +} +``` + +#### 1.3 消费者组恢复机制 + +```java +// ManualAckConsumerService.java 第58-170行 +private void ensureConsumerGroupExists() { + // 检查消费者组是否存在 + Object groups = redisTemplate.opsForStream().groups(redisStreamProperties.getKey()); + // 如果不存在,创建消费者组 + if (!groupExists) { + redisTemplate.opsForStream().createGroup( + redisStreamProperties.getKey(), + ReadOffset.from("0"), + redisStreamProperties.getConsumerGroup() + ); + } +} +``` + +#### 1.4 结论 + +- **消息不会丢失**:Redis Stream的持久化机制保证消息持久存储 +- **消息可能重复**:消费者重启后会重新处理未ACK的消息 +- **建议**:业务逻辑需要支持幂等性处理 + +### 2. 生产者端事务消息(MULTI/EXEC 或 Lua 脚本)能否保证"业务 DB 提交 + 消息入队"原子性 + +#### 2.1 问题分析 + +**项目已实现完整的事务消息机制**,支持多种事务模式来保证"业务 DB 提交 + 消息入队"的原子性。 + +#### 2.2 已实现的方案 + +##### 方案一:Redis MULTI/EXEC事务 + +```java +// TransactionalMessageProducerService.java 第42-81行 +public String sendMessageWithMultiExec(Message message) { + try { + // 生成消息ID + String messageId = UUID.randomUUID().toString(); + message.setId(messageId); + + // 将消息转换为Map + Map messageMap = convertMessageToMap(message); + + // 使用MULTI/EXEC事务 + Object result = redisTemplate.execute(new SessionCallback() { + @Override + public Object execute(org.springframework.data.redis.core.RedisOperations operations) + throws org.springframework.dao.DataAccessException { + operations.multi(); + try { + // 在事务中执行Redis操作 + operations.opsForStream().add(redisStreamProperties.getKey(), messageMap); + + // 可以添加其他Redis操作,比如更新计数器、设置过期时间等 + operations.opsForValue().increment("message:count"); + operations.opsForValue().set("message:last:" + messageId, System.currentTimeMillis()); + + // 提交事务 + return operations.exec(); + } catch (Exception e) { + operations.discard(); + throw e; + } + } + }); + + log.info("MULTI/EXEC事务消息发送成功: messageId={}, result={}", messageId, result); + return messageId; + + } catch (Exception e) { + log.error("MULTI/EXEC事务消息发送失败: {}", e.getMessage(), e); + throw new RuntimeException("事务消息发送失败", e); + } +} +``` + +##### 方案二:Lua脚本实现 + +```java +// LuaScriptTransactionalService.java 第40-62行 +private static final String BASIC_SEND_SCRIPT = """ + local streamKey = KEYS[1] + local messageId = ARGV[1] + local content = ARGV[2] + local type = ARGV[3] + local sender = ARGV[4] + local timestamp = ARGV[5] + + -- 发送消息到Stream + local recordId = redis.call('XADD', streamKey, '*', + 'id', messageId, + 'content', content, + 'type', type, + 'sender', sender, + 'timestamp', timestamp + ) + + -- 更新统计信息 + redis.call('INCR', 'message:count') + redis.call('SET', 'message:last:' .. messageId, timestamp) + + return recordId + """; +``` + +##### 方案三:消息表模式(推荐) + +```java +// MessageTableTransactionalService.java 第67-95行 +@Transactional +public String sendTransactionalMessage(Message message, String businessData) { + String messageId = UUID.randomUUID().toString(); + message.setId(messageId); + + try { + // 1. 执行业务操作 + log.info("执行业务操作: messageId={}, businessData={}", messageId, businessData); + jdbcTemplate.update( + "INSERT INTO business_data (id, data, status, created_at) VALUES (?, ?, ?, ?)", + messageId, businessData, "PROCESSING", LocalDateTime.now() + ); + + // 2. 记录消息到消息表 + log.info("记录消息到消息表: messageId={}", messageId); + jdbcTemplate.update( + "INSERT INTO message_table (id, content, type, sender, status, retry_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + messageId, message.getContent(), message.getType(), message.getSender(), + MessageStatus.PENDING.name(), 0, LocalDateTime.now(), LocalDateTime.now() + ); + + log.info("事务消息记录成功: messageId={}", messageId); + return messageId; + + } catch (Exception e) { + log.error("事务消息记录失败: messageId={}, error={}", messageId, e.getMessage(), e); + throw new RuntimeException("事务消息记录失败", e); + } +} +``` + +##### 方案四:两阶段提交模式 + +```java +// TransactionalMessageProducerService.java 第210-260行 +public String sendMessageWithTwoPhaseCommit(Message message) { + String messageId = UUID.randomUUID().toString(); + message.setId(messageId); + + try { + // 第一阶段:准备阶段 + log.info("两阶段提交 - 准备阶段: messageId={}", messageId); + + // 在Redis中创建准备状态的消息 + Map prepareData = new HashMap<>(); + prepareData.put("id", messageId); + prepareData.put("content", message.getContent()); + prepareData.put("type", message.getType()); + prepareData.put("sender", message.getSender()); + prepareData.put("status", "PREPARED"); + prepareData.put("timestamp", LocalDateTime.now().toString()); + + // 发送到准备队列 + redisTemplate.opsForStream().add("message:prepare", prepareData); + + // 模拟业务处理 + Thread.sleep(100); + + // 第二阶段:提交阶段 + log.info("两阶段提交 - 提交阶段: messageId={}", messageId); + + // 发送到正式队列 + Map commitData = convertMessageToMap(message); + commitData.put("status", "COMMITTED"); + redisTemplate.opsForStream().add(redisStreamProperties.getKey(), commitData); + + log.info("两阶段提交事务消息发送成功: messageId={}", messageId); + return messageId; + + } catch (Exception e) { + log.error("两阶段提交事务消息发送失败: messageId={}, error={}", messageId, e.getMessage(), e); + throw new RuntimeException("两阶段提交事务消息发送失败", e); + } +} +``` + +#### 2.3 原子性保证分析 + +**MULTI/EXEC模式**: +- ✅ 保证Redis操作的原子性 +- ❌ 无法跨数据库事务 +- 适用场景:Redis内部操作需要原子性 + +**Lua脚本模式**: +- ✅ 保证Redis操作的原子性 +- ✅ 支持复杂业务逻辑 +- ✅ 性能高,减少网络往返 +- ❌ 无法跨数据库事务 + +**消息表模式**: +- ✅ 保证业务数据与消息记录的原子性 +- ✅ 支持消息重试和死信队列 +- ✅ 最强的一致性保证 +- ✅ 异步发送,提高性能 + +**两阶段提交模式**: +- ✅ 支持分布式事务 +- ✅ 支持回滚机制 +- ❌ 复杂度高,性能较低 + +#### 2.4 结论 + +- **项目现状**:已实现完整的事务消息机制,支持4种事务模式 +- **Redis MULTI/EXEC**:可以保证Redis操作的原子性,但无法跨数据库事务 +- **Lua脚本**:可以实现Redis内部的原子性操作,支持复杂逻辑 +- **消息表模式**:推荐使用,保证业务数据与消息的强一致性 +- **两阶段提交**:适用于分布式事务场景 + +### 3. 消息堆积上限与Redis内存配置关系 + +#### 3.1 Redis内存配置分析 + +从`application.yml`配置可以看出: + +```yaml +# application.yml 第8-19行 +spring: + redis: + host: localhost + port: 6379 + password: + database: 0 + timeout: 2000ms + lettuce: + pool: + max-active: 8 + max-wait: -1ms + max-idle: 8 + min-idle: 0 +``` + +#### 3.2 内存限制机制 + +Redis Stream的内存使用受以下因素影响: + +1. **maxmemory配置**:Redis最大内存限制 +2. **Stream长度**:消息数量 +3. **消息大小**:单个消息的字节数 +4. **消费者组数量**:每个消费者组维护独立的状态 + +#### 3.3 消息堆积计算 + +```java +// 建议的内存监控实现 +@Service +public class StreamMemoryMonitor { + + @Autowired + private RedisTemplate redisTemplate; + + public Map getStreamMemoryInfo() { + Map info = new HashMap<>(); + + // 1. 获取Stream长度 + Long streamLength = redisTemplate.opsForStream().size("message-stream"); + info.put("streamLength", streamLength); + + // 2. 获取Redis内存使用情况 + Properties memoryInfo = redisTemplate.getConnectionFactory() + .getConnection().info("memory"); + info.put("usedMemory", memoryInfo.getProperty("used_memory")); + info.put("maxMemory", memoryInfo.getProperty("maxmemory")); + + // 3. 计算消息平均大小 + if (streamLength > 0) { + Long memoryUsage = Long.parseLong(memoryInfo.getProperty("used_memory")); + Double avgMessageSize = (double) memoryUsage / streamLength; + info.put("avgMessageSize", avgMessageSize); + } + + return info; + } +} +``` + +#### 3.4 内存优化策略 + +1. **设置maxmemory**:防止Redis内存溢出 +2. **配置淘汰策略**:LRU、LFU等 +3. **监控Stream长度**:设置告警阈值 +4. **消息TTL**:设置消息过期时间 + +#### 3.5 结论 + +- **消息堆积上限**:受Redis maxmemory配置限制 +- **建议配置**:设置合理的maxmemory和淘汰策略 +- **监控告警**:实时监控Stream长度和内存使用率 + +## 技术架构分析 + +### 1. 消费者模式对比 + +| 特性 | StreamListener模式 | Manual Ack模式 | +|------|-------------------|----------------| +| 实时性 | 高(事件驱动) | 中(轮询) | +| 控制精度 | 低(自动ACK) | 高(手动控制) | +| 性能 | 高 | 中 | +| 复杂度 | 低 | 高 | +| 适用场景 | 高并发实时处理 | 精确控制处理流程 | + +### 2. 关键代码实现 + +#### 2.1 StreamListener模式 + +```java +// StreamListenerConsumerService.java 第225-263行 +@Override +public void onMessage(MapRecord message) { + String recordId = null; + try { + messageCount.incrementAndGet(); + + // 获取消息数据 + Map messageData = message.getValue(); + recordId = message.getId().getValue(); + + // 处理消息 + processMessage(messageData); + + // 确认消息(ACK) + acknowledgeMessage(recordId); + + processedCount.incrementAndGet(); + } catch (Exception e) { + errorCount.incrementAndGet(); + // 处理失败时也要确认消息,避免重复处理 + if (recordId != null) { + acknowledgeMessage(recordId); + } + } +} +``` + +#### 2.2 Manual Ack模式 + +```java +// ManualAckConsumerService.java 第175-226行 +public void pollAndProcessMessages() { + if (isProcessing) { + log.warn("⚠️ Manual Ack 正在处理中,跳过本次拉取"); + return; + } + + isProcessing = true; + try { + // 确保消费者组存在 + ensureConsumerGroupExists(); + + // 从 Stream 中读取消息 + List> messages = redisTemplate.opsForStream() + .read(Consumer.from(redisStreamProperties.getConsumerGroup(), redisStreamProperties.getConsumerName()), + StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.lastConsumed())); + + for (MapRecord message : messages) { + try { + processMessage(message); + processedCount.incrementAndGet(); + } catch (Exception e) { + errorCount.incrementAndGet(); + } + } + } finally { + isProcessing = false; + } +} +``` + +### 3. 配置管理 + +#### 3.1 属性配置 + +```java +// RedisStreamProperties.java 第13-67行 +@Data +@Configuration +@ConfigurationProperties(prefix = "redis.stream") +public class RedisStreamProperties { + private String key = "message-stream"; + private String consumerGroup = "message-consumer-group"; + private String consumerName = "message-consumer"; + private StreamListenerConfig streamListener = new StreamListenerConfig(); + + @Data + public static class StreamListenerConfig { + private boolean autoStart = false; + private boolean processHistoricalMessages = true; + private int pollTimeout = 1; + private int corePoolSize = 2; + private int maxPoolSize = 4; + private int keepAliveTime = 60; + } +} +``` + +#### 3.2 线程池配置 + +```java +// RedisStreamConfig.java 第64-78行 +@Bean +public StreamMessageListenerContainer> streamMessageListenerContainer() { + // 创建线程池 + ThreadPoolExecutor executor = new ThreadPoolExecutor( + 2, 4, 60, TimeUnit.SECONDS, + new LinkedBlockingDeque<>(), + r -> new Thread(r, "redis-stream-consumer-" + System.currentTimeMillis()) + ); + + // 创建监听容器配置 + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + .pollTimeout(Duration.ofSeconds(1)) + .executor(executor) + .build(); + + return StreamMessageListenerContainer.create(redisConnectionFactory, options); +} +``` + +## 测试验证 + +### 1. 集成测试 + +项目包含完整的集成测试: + +```java +// RedisStreamIntegrationTest.java 第85-120行 +@Test +void testStreamListenerEndToEnd() throws InterruptedException { + // 启动 StreamListener + container.start(); + + // 订阅 Stream + Subscription subscription = container.receive( + Consumer.from(redisStreamProperties.getConsumerGroup(), streamListenerConsumerName), + StreamOffset.create(redisStreamProperties.getKey(), ReadOffset.from(">")), + streamListenerConsumerService + ); + + // 发送 100 条消息 + int sentCount = messageProducerService.sendBatchMessages(100); + assertEquals(100, sentCount); + + // 等待消息处理完成 + Thread.sleep(20000); + + // 验证消息统计 + Map stats = streamListenerConsumerService.getMessageStats(); + assertTrue((Long) stats.get("totalReceived") >= 100); + assertTrue((Long) stats.get("totalProcessed") >= 100); + assertEquals(0L, stats.get("totalErrors")); +} +``` + +### 2. 并发测试 + +```java +// RedisStreamIntegrationTest.java 第164-215行 +@Test +void testConcurrentProductionAndConsumption() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + + // 启动 StreamListener + container.start(); + + // 并发发送消息 + Thread producerThread = new Thread(() -> { + try { + messageProducerService.sendBatchMessages(50); + } finally { + latch.countDown(); + } + }); + + // 并发处理消息 + Thread consumerThread = new Thread(() -> { + try { + manualAckConsumerService.batchProcessMessages(50); + } finally { + latch.countDown(); + } + }); + + producerThread.start(); + consumerThread.start(); + + // 等待完成 + assertTrue(latch.await(30, TimeUnit.SECONDS)); +} +``` + +## 总结与建议 + +### 1. 项目优势 + +1. **双模式支持**:同时提供`StreamListener`与`Manual Ack`两种消费模式 +2. **事务消息完备**:覆盖`MULTI/EXEC`、`Lua脚本`、`消息表`、`两阶段提交`四种方案 +3. **测试充分**:端到端集成测试与并发测试覆盖关键路径 +4. **配置灵活**:可调的线程池、拉取策略与消费者组参数 +5. **可运维性较好**:基础监控指标与异常处理、重试机制已具雏形 + +### 2. 改进建议 + +1. **默认采用消息表模式**:将“消息表”设为默认事务消息落地方案,提供重试、补偿、死信与可观测性;`MULTI/EXEC`与`Lua`作为轻量场景可选项。 +2. **Pending管理与幂等性**:完善`XPENDING`巡检、超时转移与再分配;在业务侧统一幂等键设计,避免重复投递与重复消费副作用。 +3. **死信与重试策略参数化**:落库可追踪,失败按阶梯回退(指数退避+抖动),达到阈值进入DLQ,并提供人工恢复通道。 +4. **内存与长度治理**:结合`XTRIM`(基于长度/时间)与消息字段瘦身;对大消息启用压缩或外部存储(仅存指针)。 +5. **分片与扩展性**:按业务键(如订单ID)进行多`Stream`分区;消费者组按分区扩展并支持水平扩容与重平衡。 +6. **监控可观测性**:补充生产/消费QPS、Lag、Ack时延、P90/P99、PENDING大小、重试/丢弃计数的指标与可视化仪表板。 +7. **批量与流水线优化**:生产端`XADD`合并与管线化,消费端批量读取与批量`XACK`;连接池参数基于压测调优。 + +### 3. 生产环境建议 + +1. **容量与持久化**:设置合理`maxmemory`与淘汰策略;AOF 建议`everysec`并配合`RDB`定期快照,校验恢复时间目标(RTO/RPO)。 +2. **Stream治理策略**:为关键`Stream`启用`XTRIM`策略与保留窗口;设置长度/Lag/内存使用率告警阈值与自愈流程。 +3. **集群与高可用**:优先采用Redis Cluster/主从+哨兵或托管服务;为跨可用区部署设计故障演练与自动故障转移。 +4. **限流与隔离**:对生产/消费端设置限流与熔断;按业务域拆分实例或命名空间,避免相互影响。 +5. **安全与合规**:开启密码、TLS与网络访问控制;对敏感字段加密/脱敏,审计关键操作。 +6. **变更与演练**:灰度发布消费者;定期演练消费者宕机恢复、PENDING迁移、DLQ回放与回溯恢复流程。 + +## 附录 + +### A. 关键配置参数 + +```yaml +# Redis连接配置 +spring: + redis: + host: localhost + port: 6379 + timeout: 2000ms + lettuce: + pool: + max-active: 8 + max-idle: 8 + +# Stream配置 +redis: + stream: + key: "message-stream" + consumer-group: "message-consumer-group" + consumer-name: "message-consumer" +``` + +### B. 监控指标 + +1. **Stream长度**:`XLEN message-stream` +2. **消费者组信息**:`XINFO GROUPS message-stream` +3. **待处理消息**:`XPENDING message-stream message-consumer-group` +4. **Redis内存**:`INFO memory` + +### C. 常用Redis命令 + +```bash +# 查看Stream信息 +XINFO STREAM message-stream + +# 查看消费者组 +XINFO GROUPS message-stream + +# 查看待处理消息 +XPENDING message-stream message-consumer-group + +# 手动确认消息 +XACK message-stream message-consumer-group message-id +``` + +--- + +**报告生成时间**:2025年 +**项目版本**:Spring Boot Redis Stream Demo +**分析范围**:消费者组恢复、事务消息、内存配置 diff --git a/out/production/spring-boot-starter-data-redis/application-test.yml b/out/production/spring-boot-starter-data-redis/application-test.yml new file mode 100644 index 0000000..105c810 --- /dev/null +++ b/out/production/spring-boot-starter-data-redis/application-test.yml @@ -0,0 +1,42 @@ +server: + port: 8080 + +spring: + application: + name: redis-stream-demo + + redis: + host: localhost + port: 6379 + password: + database: 0 + timeout: 10000ms # 增加超时时间到10秒 + lettuce: + pool: + max-active: 8 + max-wait: 5000ms # 设置合理的等待时间 + max-idle: 8 + min-idle: 0 + +# Redis Stream Configuration +redis: + stream: + key: "message-stream" + consumer-group: "message-consumer-group" + consumer-name: "message-consumer" + + # 消费者模式配置 + consumer: + # 默认模式: stream-listener, manual-ack, both + default-mode: "both" + # 是否启用 StreamListener 模式 + stream-listener-enabled: false + # 是否启用 Manual Ack 模式 + manual-ack-enabled: false + +# Logging Configuration +logging: + level: + com.example: DEBUG + org.springframework.data.redis: DEBUG + io.lettuce: DEBUG diff --git a/out/production/spring-boot-starter-data-redis/application.yml b/out/production/spring-boot-starter-data-redis/application.yml new file mode 100644 index 0000000..88c2d5c --- /dev/null +++ b/out/production/spring-boot-starter-data-redis/application.yml @@ -0,0 +1,126 @@ +server: + port: 8080 + +spring: + application: + name: redis-stream-demo + + # 数据库配置 + datasource: + url: jdbc:h2:mem:testdb + driver-class-name: org.h2.Driver + username: sa + password: + hikari: + maximum-pool-size: 10 + minimum-idle: 5 + connection-timeout: 30000 + idle-timeout: 600000 + max-lifetime: 1800000 + + # H2数据库控制台 + h2: + console: + enabled: true + path: /h2-console + + # JPA配置 + jpa: + hibernate: + ddl-auto: create-drop + show-sql: true + properties: + hibernate: + format_sql: true + + # 异步任务配置 + task: + execution: + pool: + core-size: 5 + max-size: 10 + queue-capacity: 100 + keep-alive: 60s + scheduling: + pool: + size: 5 + + redis: + host: localhost + port: 6379 + password: + database: 0 + timeout: 2000ms + lettuce: + pool: + max-active: 8 + max-wait: -1ms + max-idle: 8 + min-idle: 0 + +# Redis Stream Configuration +redis: + stream: + key: "message-stream" + consumer-group: "message-consumer-group" + consumer-name: "message-consumer" + + # 事务消息配置 + transactional: + # 消息表模式配置 + message-table: + # 是否启用消息表模式事务消息服务(默认关闭) + enabled: false + # 是否启用定时任务处理待发送消息(默认关闭) + scheduled-processing-enabled: false + # 是否启用定时任务重试失败消息(默认关闭) + scheduled-retry-enabled: false + + # 消费者模式配置 + consumer: + # 默认模式: stream-listener, manual-ack, both + default-mode: "both" + # 是否启用 StreamListener 模式 + stream-listener-enabled: true + # 是否启用 Manual Ack 模式 + manual-ack-enabled: true + + # StreamListener 配置 + stream-listener: + # 是否自动启动 + auto-start: false + # 是否处理历史消息(true: 从开头读取所有消息,false: 只读取新消息) + process-historical-messages: true + # 轮询超时时间(秒) + poll-timeout: 1 + # 线程池核心线程数 + core-pool-size: 2 + # 线程池最大线程数 + max-pool-size: 4 + # 线程空闲时间(秒) + keep-alive-time: 60 + + # Manual Ack 配置 + manual-ack: + # 默认批量大小 + default-batch-size: 10 + # 最大批量大小 + max-batch-size: 100 + # 轮询间隔(毫秒) + poll-interval: 1000 + # 是否启用并发处理 + concurrent-processing: false + # 最大并发数 + max-concurrency: 5 + +# Application Configuration +app: + # 是否在启动时运行事务消息示例(默认关闭) + run-examples-on-startup: false + +# Logging Configuration +logging: + level: + com.example: DEBUG + org.springframework.data.redis: DEBUG + io.lettuce: DEBUG diff --git a/out/production/spring-boot-starter-data-redis/sql/schema.sql b/out/production/spring-boot-starter-data-redis/sql/schema.sql new file mode 100644 index 0000000..6e111d7 --- /dev/null +++ b/out/production/spring-boot-starter-data-redis/sql/schema.sql @@ -0,0 +1,56 @@ +-- 消息表模式事务消息数据库表结构 + +-- 业务数据表 +CREATE TABLE IF NOT EXISTS business_data ( + id VARCHAR(36) PRIMARY KEY COMMENT '业务数据ID', + data TEXT NOT NULL COMMENT '业务数据内容', + status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '业务数据状态', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_status (status), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='业务数据表'; + +-- 消息表 +CREATE TABLE IF NOT EXISTS message_table ( + id VARCHAR(36) PRIMARY KEY COMMENT '消息ID', + content TEXT NOT NULL COMMENT '消息内容', + type VARCHAR(50) NOT NULL COMMENT '消息类型', + sender VARCHAR(100) NOT NULL COMMENT '发送者', + status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '消息状态', + retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', + error_message TEXT COMMENT '错误信息', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_status (status), + INDEX idx_created_at (created_at), + INDEX idx_updated_at (updated_at), + INDEX idx_retry_count (retry_count) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息表'; + +-- 死信队列表 +CREATE TABLE IF NOT EXISTS dead_letter_queue ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '死信ID', + message_id VARCHAR(36) NOT NULL COMMENT '原始消息ID', + content TEXT NOT NULL COMMENT '消息内容', + type VARCHAR(50) NOT NULL COMMENT '消息类型', + sender VARCHAR(100) NOT NULL COMMENT '发送者', + error_message TEXT COMMENT '错误信息', + retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + INDEX idx_message_id (message_id), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='死信队列表'; + +-- 消息发送日志表 +CREATE TABLE IF NOT EXISTS message_send_log ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '日志ID', + message_id VARCHAR(36) NOT NULL COMMENT '消息ID', + action VARCHAR(50) NOT NULL COMMENT '操作类型', + status VARCHAR(20) NOT NULL COMMENT '操作状态', + error_message TEXT COMMENT '错误信息', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + INDEX idx_message_id (message_id), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息发送日志表'; + diff --git a/pom.xml b/pom.xml index 02d9857..e3e8cde 100644 --- a/pom.xml +++ b/pom.xml @@ -21,9 +21,9 @@ - 8 - 8 - 8 + 17 + 17 + 17 UTF-8 @@ -46,6 +46,25 @@ spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + + com.h2database + h2 + runtime + + + + + org.springframework.boot + spring-boot-starter-actuator + + org.springframework.boot @@ -90,6 +109,15 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 17 + UTF-8 + + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/com/example/RedisStreamApplication.java b/src/main/java/com/example/RedisStreamApplication.java index b2a86f1..75d90c9 100644 --- a/src/main/java/com/example/RedisStreamApplication.java +++ b/src/main/java/com/example/RedisStreamApplication.java @@ -1,15 +1,59 @@ package com.example; +import com.example.example.TransactionalMessageUsageExample; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; /** - * Spring Boot Redis Stream 应用启动类 + * Redis Stream 事务消息应用启动类 */ +@Slf4j @SpringBootApplication -public class RedisStreamApplication { +@EnableAsync +@EnableScheduling +@RequiredArgsConstructor +public class RedisStreamApplication implements CommandLineRunner { + + private final TransactionalMessageUsageExample transactionalMessageUsageExample; public static void main(String[] args) { SpringApplication.run(RedisStreamApplication.class, args); } -} + + @Override + public void run(String... args) throws Exception { + log.info("🚀 Redis Stream 事务消息应用启动完成!"); + log.info("📖 访问以下地址查看API文档:"); + log.info(" - H2数据库控制台: http://localhost:8080/h2-console"); + log.info(" - 健康检查: http://localhost:8080/actuator/health"); + log.info(" - 应用信息: http://localhost:8080/actuator/info"); + + log.info("🔧 事务消息API端点:"); + log.info(" - MULTI/EXEC事务消息: POST /api/transactional-message/multiexec"); + log.info(" - Lua脚本事务消息: POST /api/transactional-message/lua"); + log.info(" - 消息表模式事务消息: POST /api/transactional-message/message-table"); + log.info(" - 两阶段提交事务消息: POST /api/transactional-message/two-phase"); + log.info(" - 批量事务消息: POST /api/transactional-message/batch"); + log.info(" - 条件Lua脚本消息: POST /api/transactional-message/lua/conditional"); + log.info(" - 批量Lua脚本消息: POST /api/transactional-message/lua/batch"); + log.info(" - 事务Lua脚本消息: POST /api/transactional-message/lua/transactional"); + log.info(" - 消息表模式批量发送: POST /api/transactional-message/message-table/batch"); + log.info(" - 手动重试消息: POST /api/transactional-message/retry/{messageId}"); + log.info(" - 获取统计信息: GET /api/transactional-message/stats"); + log.info(" - 获取Lua脚本统计: GET /api/transactional-message/lua/stats"); + log.info(" - 获取消息表统计: GET /api/transactional-message/message-table/stats"); + log.info(" - 清理业务数据: DELETE /api/transactional-message/lua/cleanup"); + log.info(" - 清理死信消息: DELETE /api/transactional-message/message-table/cleanup-dead-letter"); + + // 运行事务消息示例 + log.info("🎯 开始运行事务消息示例..."); + transactionalMessageUsageExample.runAllExamples(); + + log.info("✅ 应用启动完成,所有功能已就绪!"); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/config/AsyncConfig.java b/src/main/java/com/example/config/AsyncConfig.java new file mode 100644 index 0000000..a65ba7c --- /dev/null +++ b/src/main/java/com/example/config/AsyncConfig.java @@ -0,0 +1,62 @@ +package com.example.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 异步任务配置类 + */ +@Slf4j +@Configuration +@EnableAsync +@EnableScheduling +public class AsyncConfig { + + /** + * 异步任务执行器 + */ + @Bean("taskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("AsyncTask-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + + log.info("异步任务执行器已配置: corePoolSize=5, maxPoolSize=10, queueCapacity=100"); + return executor; + } + + /** + * 定时任务执行器 + */ + @Bean("schedulingExecutor") + public Executor schedulingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(3); + executor.setMaxPoolSize(5); + executor.setQueueCapacity(50); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("Scheduling-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + + log.info("定时任务执行器已配置: corePoolSize=3, maxPoolSize=5, queueCapacity=50"); + return executor; + } +} + diff --git a/src/main/java/com/example/config/RedisStreamProperties.java b/src/main/java/com/example/config/RedisStreamProperties.java index 86b1d9a..dd8c02c 100644 --- a/src/main/java/com/example/config/RedisStreamProperties.java +++ b/src/main/java/com/example/config/RedisStreamProperties.java @@ -32,6 +32,11 @@ public class RedisStreamProperties { */ private StreamListenerConfig streamListener = new StreamListenerConfig(); + /** + * 事务消息配置 + */ + private TransactionalConfig transactional = new TransactionalConfig(); + @Data public static class StreamListenerConfig { /** @@ -64,4 +69,30 @@ public class RedisStreamProperties { */ private int keepAliveTime = 60; } + + @Data + public static class TransactionalConfig { + /** + * 消息表模式配置 + */ + private MessageTableConfig messageTable = new MessageTableConfig(); + } + + @Data + public static class MessageTableConfig { + /** + * 是否启用消息表模式事务消息服务 + */ + private boolean enabled = false; + + /** + * 是否启用定时任务处理待发送消息 + */ + private boolean scheduledProcessingEnabled = false; + + /** + * 是否启用定时任务重试失败消息 + */ + private boolean scheduledRetryEnabled = false; + } } diff --git a/src/main/java/com/example/controller/TransactionalMessageController.java b/src/main/java/com/example/controller/TransactionalMessageController.java new file mode 100644 index 0000000..51fd70d --- /dev/null +++ b/src/main/java/com/example/controller/TransactionalMessageController.java @@ -0,0 +1,533 @@ +package com.example.controller; + +import com.example.model.Message; +import com.example.service.LuaScriptTransactionalService; +import com.example.service.MessageTableTransactionalService; +import com.example.service.TransactionalMessageProducerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * 事务消息控制器 + * + * 提供各种事务消息模式的REST API接口 + */ +@Slf4j +@RestController +@RequestMapping("/api/transactional-message") +public class TransactionalMessageController { + + private final TransactionalMessageProducerService transactionalMessageProducerService; + private final LuaScriptTransactionalService luaScriptTransactionalService; + + @Autowired(required = false) + private MessageTableTransactionalService messageTableTransactionalService; + + public TransactionalMessageController(TransactionalMessageProducerService transactionalMessageProducerService, + LuaScriptTransactionalService luaScriptTransactionalService) { + this.transactionalMessageProducerService = transactionalMessageProducerService; + this.luaScriptTransactionalService = luaScriptTransactionalService; + } + + /** + * 发送MULTI/EXEC事务消息 + */ + @PostMapping("/multiexec") + public ResponseEntity> sendMultiExecMessage(@RequestBody MessageRequest request) { + try { + Message message = createMessage(request); + String messageId = transactionalMessageProducerService.sendMessageWithMultiExec(message); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "MULTI/EXEC事务消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("MULTI/EXEC事务消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 发送Lua脚本事务消息 + */ + @PostMapping("/lua") + public ResponseEntity> sendLuaScriptMessage(@RequestBody MessageRequest request) { + try { + Message message = createMessage(request); + String messageId = transactionalMessageProducerService.sendMessageWithLuaScript(message); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "Lua脚本事务消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("Lua脚本事务消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 发送消息表模式事务消息 + */ + @PostMapping("/message-table") + public ResponseEntity> sendMessageTableMessage(@RequestBody MessageTableRequest request) { + try { + Message message = createMessage(request); + String messageId = transactionalMessageProducerService.sendMessageWithMessageTable( + message, request.getBusinessData() + ); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "消息表模式事务消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("消息表模式事务消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 发送两阶段提交事务消息 + */ + @PostMapping("/two-phase") + public ResponseEntity> sendTwoPhaseCommitMessage(@RequestBody MessageRequest request) { + try { + Message message = createMessage(request); + String messageId = transactionalMessageProducerService.sendMessageWithTwoPhaseCommit(message); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "两阶段提交事务消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("两阶段提交事务消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 批量发送事务消息 + */ + @PostMapping("/batch") + public ResponseEntity> sendBatchTransactionalMessages( + @RequestBody BatchMessageRequest request) { + try { + int successCount = transactionalMessageProducerService.sendBatchTransactionalMessages( + request.getCount(), request.getTransactionType() + ); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("successCount", successCount); + response.put("totalCount", request.getCount()); + response.put("message", "批量事务消息发送完成"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("批量事务消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 基础Lua脚本发送消息 + */ + @PostMapping("/lua/basic") + public ResponseEntity> sendBasicLuaScriptMessage(@RequestBody MessageRequest request) { + try { + Message message = createMessage(request); + String messageId = luaScriptTransactionalService.sendMessageWithBasicScript(message); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "基础Lua脚本消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("基础Lua脚本消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 条件Lua脚本发送消息 + */ + @PostMapping("/lua/conditional") + public ResponseEntity> sendConditionalLuaScriptMessage( + @RequestBody ConditionalMessageRequest request) { + try { + Message message = createMessage(request); + String messageId = luaScriptTransactionalService.sendMessageWithCondition( + message, request.getConditionKey(), request.getExpectedValue() + ); + + Map response = new HashMap<>(); + if (messageId != null) { + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "条件Lua脚本消息发送成功"); + } else { + response.put("success", false); + response.put("message", "条件不满足,消息未发送"); + } + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("条件Lua脚本消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 批量Lua脚本发送消息 + */ + @PostMapping("/lua/batch") + public ResponseEntity> sendBatchLuaScriptMessage(@RequestBody BatchLuaMessageRequest request) { + try { + List messages = request.getMessages().stream() + .map(this::createMessage) + .collect(Collectors.toList()); + + List results = luaScriptTransactionalService.sendBatchMessagesWithScript(messages); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("results", results); + response.put("count", results.size()); + response.put("message", "批量Lua脚本消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("批量Lua脚本消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 事务Lua脚本发送消息 + */ + @PostMapping("/lua/transactional") + public ResponseEntity> sendTransactionalLuaScriptMessage( + @RequestBody TransactionalLuaMessageRequest request) { + try { + Message message = createMessage(request); + String messageId = luaScriptTransactionalService.sendTransactionalMessage( + message, request.getBusinessData() + ); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageId", messageId); + response.put("message", "事务Lua脚本消息发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("事务Lua脚本消息发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 消息表模式批量发送 + */ + @PostMapping("/message-table/batch") + public ResponseEntity> sendBatchMessageTableMessage( + @RequestBody BatchMessageTableRequest request) { + try { + if (messageTableTransactionalService == null) { + Map response = new HashMap<>(); + response.put("success", false); + response.put("message", "消息表模式事务消息服务未启用"); + return ResponseEntity.badRequest().body(response); + } + + List messages = request.getMessages().stream() + .map(this::createMessage) + .collect(Collectors.toList()); + + List messageIds = messageTableTransactionalService.sendBatchTransactionalMessages( + messages, request.getBusinessDataList() + ); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("messageIds", messageIds); + response.put("count", messageIds.size()); + response.put("message", "消息表模式批量发送成功"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("消息表模式批量发送失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 手动重试消息 + */ + @PostMapping("/retry/{messageId}") + public ResponseEntity> retryMessage(@PathVariable String messageId) { + try { + if (messageTableTransactionalService == null) { + Map response = new HashMap<>(); + response.put("success", false); + response.put("message", "消息表模式事务消息服务未启用"); + return ResponseEntity.badRequest().body(response); + } + + boolean success = messageTableTransactionalService.retryMessage(messageId); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("messageId", messageId); + response.put("message", success ? "消息重试成功" : "消息重试失败"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("消息重试失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 获取事务消息统计信息 + */ + @GetMapping("/stats") + public ResponseEntity> getTransactionalMessageStats() { + try { + Map stats = transactionalMessageProducerService.getTransactionalMessageStats(); + return ResponseEntity.ok(stats); + + } catch (Exception e) { + log.error("获取事务消息统计信息失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 获取Lua脚本统计信息 + */ + @GetMapping("/lua/stats") + public ResponseEntity> getLuaScriptStats() { + try { + Map stats = luaScriptTransactionalService.getLuaScriptStats(); + return ResponseEntity.ok(stats); + + } catch (Exception e) { + log.error("获取Lua脚本统计信息失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 获取消息表统计信息 + */ + @GetMapping("/message-table/stats") + public ResponseEntity> getMessageTableStats() { + try { + if (messageTableTransactionalService == null) { + Map response = new HashMap<>(); + response.put("success", false); + response.put("message", "消息表模式事务消息服务未启用"); + return ResponseEntity.badRequest().body(response); + } + + Map stats = messageTableTransactionalService.getMessageTableStats(); + return ResponseEntity.ok(stats); + + } catch (Exception e) { + log.error("获取消息表统计信息失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 清理业务数据 + */ + @DeleteMapping("/lua/cleanup") + public ResponseEntity> cleanupBusinessData() { + try { + luaScriptTransactionalService.cleanupBusinessData(); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("message", "业务数据清理完成"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("清理业务数据失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 清理死信消息 + */ + @DeleteMapping("/message-table/cleanup-dead-letter") + public ResponseEntity> cleanupDeadLetterMessages() { + try { + if (messageTableTransactionalService == null) { + Map response = new HashMap<>(); + response.put("success", false); + response.put("message", "消息表模式事务消息服务未启用"); + return ResponseEntity.badRequest().body(response); + } + + int deletedCount = messageTableTransactionalService.cleanupDeadLetterMessages(); + + Map response = new HashMap<>(); + response.put("success", true); + response.put("deletedCount", deletedCount); + response.put("message", "死信消息清理完成"); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("清理死信消息失败", e); + return ResponseEntity.internalServerError().body(createErrorResponse(e.getMessage())); + } + } + + /** + * 创建消息对象 + */ + private Message createMessage(MessageRequest request) { + Message message = new Message(); + message.setId(UUID.randomUUID().toString()); + message.setContent(request.getContent()); + message.setType(request.getType() != null ? request.getType() : "API"); + message.setSender(request.getSender() != null ? request.getSender() : "api-sender"); + message.setTimestamp(LocalDateTime.now()); + return message; + } + + /** + * 创建错误响应 + */ + private Map createErrorResponse(String errorMessage) { + Map response = new HashMap<>(); + response.put("success", false); + response.put("error", errorMessage); + return response; + } + + /** + * 消息请求对象 + */ + public static class MessageRequest { + private String content; + private String type; + private String sender; + + // Getters and Setters + public String getContent() { return content; } + public void setContent(String content) { this.content = content; } + public String getType() { return type; } + public void setType(String type) { this.type = type; } + public String getSender() { return sender; } + public void setSender(String sender) { this.sender = sender; } + } + + /** + * 消息表请求对象 + */ + public static class MessageTableRequest extends MessageRequest { + private String businessData; + + public String getBusinessData() { return businessData; } + public void setBusinessData(String businessData) { this.businessData = businessData; } + } + + /** + * 批量消息请求对象 + */ + public static class BatchMessageRequest { + private int count; + private String transactionType; + + public int getCount() { return count; } + public void setCount(int count) { this.count = count; } + public String getTransactionType() { return transactionType; } + public void setTransactionType(String transactionType) { this.transactionType = transactionType; } + } + + /** + * 条件消息请求对象 + */ + public static class ConditionalMessageRequest extends MessageRequest { + private String conditionKey; + private String expectedValue; + + public String getConditionKey() { return conditionKey; } + public void setConditionKey(String conditionKey) { this.conditionKey = conditionKey; } + public String getExpectedValue() { return expectedValue; } + public void setExpectedValue(String expectedValue) { this.expectedValue = expectedValue; } + } + + /** + * 批量Lua消息请求对象 + */ + public static class BatchLuaMessageRequest { + private List messages; + + public List getMessages() { return messages; } + public void setMessages(List messages) { this.messages = messages; } + } + + /** + * 事务Lua消息请求对象 + */ + public static class TransactionalLuaMessageRequest extends MessageRequest { + private String businessData; + + public String getBusinessData() { return businessData; } + public void setBusinessData(String businessData) { this.businessData = businessData; } + } + + /** + * 批量消息表请求对象 + */ + public static class BatchMessageTableRequest { + private List messages; + private List businessDataList; + + public List getMessages() { return messages; } + public void setMessages(List messages) { this.messages = messages; } + public List getBusinessDataList() { return businessDataList; } + public void setBusinessDataList(List businessDataList) { this.businessDataList = businessDataList; } + } +} + diff --git a/src/main/java/com/example/example/TransactionalMessageUsageExample.java b/src/main/java/com/example/example/TransactionalMessageUsageExample.java new file mode 100644 index 0000000..1ede852 --- /dev/null +++ b/src/main/java/com/example/example/TransactionalMessageUsageExample.java @@ -0,0 +1,296 @@ +package com.example.example; + +import com.example.model.Message; +import com.example.service.LuaScriptTransactionalService; +import com.example.service.MessageTableTransactionalService; +import com.example.service.TransactionalMessageProducerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** + * 事务消息使用示例 + * + * 展示各种事务消息模式的使用方法 + */ +@Slf4j +@Component +public class TransactionalMessageUsageExample { + + private final TransactionalMessageProducerService transactionalMessageProducerService; + private final LuaScriptTransactionalService luaScriptTransactionalService; + + @Autowired(required = false) + private MessageTableTransactionalService messageTableTransactionalService; + + public TransactionalMessageUsageExample(TransactionalMessageProducerService transactionalMessageProducerService, + LuaScriptTransactionalService luaScriptTransactionalService) { + this.transactionalMessageProducerService = transactionalMessageProducerService; + this.luaScriptTransactionalService = luaScriptTransactionalService; + } + + /** + * 示例1:MULTI/EXEC事务消息 + */ + public void demonstrateMultiExecTransactionalMessage() { + log.info("=== 示例1:MULTI/EXEC事务消息 ==="); + + try { + Message message = createSampleMessage("MULTI/EXEC事务消息示例"); + String messageId = transactionalMessageProducerService.sendMessageWithMultiExec(message); + + log.info("MULTI/EXEC事务消息发送成功: messageId={}", messageId); + + } catch (Exception e) { + log.error("MULTI/EXEC事务消息发送失败", e); + } + } + + /** + * 示例2:Lua脚本事务消息 + */ + public void demonstrateLuaScriptTransactionalMessage() { + log.info("=== 示例2:Lua脚本事务消息 ==="); + + try { + Message message = createSampleMessage("Lua脚本事务消息示例"); + String messageId = transactionalMessageProducerService.sendMessageWithLuaScript(message); + + log.info("Lua脚本事务消息发送成功: messageId={}", messageId); + + } catch (Exception e) { + log.error("Lua脚本事务消息发送失败", e); + } + } + + /** + * 示例3:消息表模式事务消息 + */ + public void demonstrateMessageTableTransactionalMessage() { + log.info("=== 示例3:消息表模式事务消息 ==="); + + try { + Message message = createSampleMessage("消息表模式事务消息示例"); + String businessData = "业务数据-" + UUID.randomUUID().toString(); + String messageId = transactionalMessageProducerService.sendMessageWithMessageTable(message, businessData); + + log.info("消息表模式事务消息发送成功: messageId={}, businessData={}", messageId, businessData); + + } catch (Exception e) { + log.error("消息表模式事务消息发送失败", e); + } + } + + /** + * 示例4:两阶段提交事务消息 + */ + public void demonstrateTwoPhaseCommitTransactionalMessage() { + log.info("=== 示例4:两阶段提交事务消息 ==="); + + try { + Message message = createSampleMessage("两阶段提交事务消息示例"); + String messageId = transactionalMessageProducerService.sendMessageWithTwoPhaseCommit(message); + + log.info("两阶段提交事务消息发送成功: messageId={}", messageId); + + } catch (Exception e) { + log.error("两阶段提交事务消息发送失败", e); + } + } + + /** + * 示例5:基础Lua脚本发送 + */ + public void demonstrateBasicLuaScriptMessage() { + log.info("=== 示例5:基础Lua脚本发送 ==="); + + try { + Message message = createSampleMessage("基础Lua脚本消息示例"); + String messageId = luaScriptTransactionalService.sendMessageWithBasicScript(message); + + log.info("基础Lua脚本消息发送成功: messageId={}", messageId); + + } catch (Exception e) { + log.error("基础Lua脚本消息发送失败", e); + } + } + + /** + * 示例6:条件Lua脚本发送 + */ + public void demonstrateConditionalLuaScriptMessage() { + log.info("=== 示例6:条件Lua脚本发送 ==="); + + try { + Message message = createSampleMessage("条件Lua脚本消息示例"); + String conditionKey = "test:condition"; + String expectedValue = "test_value"; + + // 设置条件 + luaScriptTransactionalService.sendMessageWithCondition(message, conditionKey, expectedValue); + + log.info("条件Lua脚本消息发送成功: conditionKey={}, expectedValue={}", conditionKey, expectedValue); + + } catch (Exception e) { + log.error("条件Lua脚本消息发送失败", e); + } + } + + /** + * 示例7:批量Lua脚本发送 + */ + public void demonstrateBatchLuaScriptMessage() { + log.info("=== 示例7:批量Lua脚本发送 ==="); + + try { + List messages = Arrays.asList( + createSampleMessage("批量消息1"), + createSampleMessage("批量消息2"), + createSampleMessage("批量消息3") + ); + + List results = luaScriptTransactionalService.sendBatchMessagesWithScript(messages); + + log.info("批量Lua脚本消息发送成功: count={}, results={}", results.size(), results); + + } catch (Exception e) { + log.error("批量Lua脚本消息发送失败", e); + } + } + + /** + * 示例8:事务Lua脚本发送 + */ + public void demonstrateTransactionalLuaScriptMessage() { + log.info("=== 示例8:事务Lua脚本发送 ==="); + + try { + Message message = createSampleMessage("事务Lua脚本消息示例"); + String businessData = "业务数据-" + UUID.randomUUID().toString(); + String messageId = luaScriptTransactionalService.sendTransactionalMessage(message, businessData); + + log.info("事务Lua脚本消息发送成功: messageId={}, businessData={}", messageId, businessData); + + } catch (Exception e) { + log.error("事务Lua脚本消息发送失败", e); + } + } + + /** + * 示例9:消息表模式批量发送 + */ + public void demonstrateBatchMessageTableMessage() { + log.info("=== 示例9:消息表模式批量发送 ==="); + + if (messageTableTransactionalService == null) { + log.warn("消息表模式事务消息服务未启用,跳过此示例"); + return; + } + + try { + List messages = Arrays.asList( + createSampleMessage("批量消息表消息1"), + createSampleMessage("批量消息表消息2"), + createSampleMessage("批量消息表消息3") + ); + + List businessDataList = Arrays.asList( + "业务数据1", + "业务数据2", + "业务数据3" + ); + + List messageIds = messageTableTransactionalService.sendBatchTransactionalMessages( + messages, businessDataList + ); + + log.info("消息表模式批量发送成功: count={}, messageIds={}", messageIds.size(), messageIds); + + } catch (Exception e) { + log.error("消息表模式批量发送失败", e); + } + } + + /** + * 示例10:批量事务消息发送 + */ + public void demonstrateBatchTransactionalMessages() { + log.info("=== 示例10:批量事务消息发送 ==="); + + try { + int count = 10; + String transactionType = "multiexec"; + + int successCount = transactionalMessageProducerService.sendBatchTransactionalMessages( + count, transactionType + ); + + log.info("批量事务消息发送成功: successCount={}, totalCount={}", successCount, count); + + } catch (Exception e) { + log.error("批量事务消息发送失败", e); + } + } + + /** + * 运行所有示例 + */ + public void runAllExamples() { + log.info("🚀 开始运行事务消息示例..."); + + try { + demonstrateMultiExecTransactionalMessage(); + Thread.sleep(1000); + + demonstrateLuaScriptTransactionalMessage(); + Thread.sleep(1000); + + demonstrateMessageTableTransactionalMessage(); + Thread.sleep(1000); + + demonstrateTwoPhaseCommitTransactionalMessage(); + Thread.sleep(1000); + + demonstrateBasicLuaScriptMessage(); + Thread.sleep(1000); + + demonstrateConditionalLuaScriptMessage(); + Thread.sleep(1000); + + demonstrateBatchLuaScriptMessage(); + Thread.sleep(1000); + + demonstrateTransactionalLuaScriptMessage(); + Thread.sleep(1000); + + demonstrateBatchMessageTableMessage(); + Thread.sleep(1000); + + demonstrateBatchTransactionalMessages(); + + log.info("🎉 所有事务消息示例运行完成!"); + + } catch (Exception e) { + log.error("运行事务消息示例失败", e); + } + } + + /** + * 创建示例消息 + */ + private Message createSampleMessage(String content) { + Message message = new Message(); + message.setId(UUID.randomUUID().toString()); + message.setContent(content); + message.setType("EXAMPLE"); + message.setSender("example-sender"); + message.setTimestamp(LocalDateTime.now()); + return message; + } +} + diff --git a/src/main/java/com/example/service/MessageTableTransactionalService.java b/src/main/java/com/example/service/MessageTableTransactionalService.java new file mode 100644 index 0000000..95c608e --- /dev/null +++ b/src/main/java/com/example/service/MessageTableTransactionalService.java @@ -0,0 +1,419 @@ +package com.example.service; + +import com.example.config.RedisStreamProperties; +import com.example.model.Message; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * 消息表模式事务消息服务 + * + * 实现原理: + * 1. 业务操作和消息记录在同一个数据库事务中 + * 2. 异步任务定期扫描消息表,发送未发送的消息 + * 3. 支持消息重试和死信队列机制 + * + * 优势: + * 1. 强一致性:业务数据和消息记录在同一事务中 + * 2. 可靠性:消息不会丢失 + * 3. 可恢复:支持消息重试机制 + */ +@Slf4j +@Service +@RequiredArgsConstructor +@ConditionalOnProperty(name = "redis.transactional.message-table.enabled", havingValue = "true", matchIfMissing = false) +public class MessageTableTransactionalService { + + private final JdbcTemplate jdbcTemplate; + private final RedisTemplate redisTemplate; + private final RedisStreamProperties redisStreamProperties; + + /** + * 消息状态枚举 + */ + public enum MessageStatus { + PENDING("待发送"), + SENT("已发送"), + FAILED("发送失败"), + RETRYING("重试中"), + DEAD_LETTER("死信"); + + private final String description; + + MessageStatus(String description) { + this.description = description; + } + + public String getDescription() { + return description; + } + } + + /** + * 发送事务消息(消息表模式) + * 业务操作和消息记录在同一事务中 + */ + @Transactional + public String sendTransactionalMessage(Message message, String businessData) { + String messageId = UUID.randomUUID().toString(); + message.setId(messageId); + + try { + // 1. 执行业务操作 + log.info("执行业务操作: messageId={}, businessData={}", messageId, businessData); + jdbcTemplate.update( + "INSERT INTO business_data (id, data, status, created_at) VALUES (?, ?, ?, ?)", + messageId, businessData, "PROCESSING", LocalDateTime.now() + ); + + // 2. 记录消息到消息表 + log.info("记录消息到消息表: messageId={}", messageId); + jdbcTemplate.update( + "INSERT INTO message_table (id, content, type, sender, status, retry_count, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + messageId, message.getContent(), message.getType(), message.getSender(), + MessageStatus.PENDING.name(), 0, LocalDateTime.now(), LocalDateTime.now() + ); + + log.info("事务消息记录成功: messageId={}", messageId); + return messageId; + + } catch (Exception e) { + log.error("事务消息记录失败: messageId={}, error={}", messageId, e.getMessage(), e); + throw new RuntimeException("事务消息记录失败", e); + } + } + + /** + * 批量发送事务消息 + */ + @Transactional + public List sendBatchTransactionalMessages(List messages, List businessDataList) { + if (messages.size() != businessDataList.size()) { + throw new IllegalArgumentException("消息数量与业务数据数量不匹配"); + } + + List messageIds = new java.util.ArrayList<>(); + + try { + for (int i = 0; i < messages.size(); i++) { + Message message = messages.get(i); + String businessData = businessDataList.get(i); + String messageId = sendTransactionalMessage(message, businessData); + messageIds.add(messageId); + } + + log.info("批量事务消息记录成功: count={}", messages.size()); + return messageIds; + + } catch (Exception e) { + log.error("批量事务消息记录失败: {}", e.getMessage(), e); + throw new RuntimeException("批量事务消息记录失败", e); + } + } + + /** + * 异步发送消息到Redis Stream + * 定期扫描消息表,发送未发送的消息 + */ + @Async + @Scheduled(fixedDelay = 5000) // 每5秒执行一次 + @ConditionalOnProperty(name = "redis.transactional.message-table.scheduled-processing-enabled", havingValue = "true", matchIfMissing = false) + public void processPendingMessages() { + try { + log.debug("开始扫描待发送消息"); + + // 查询待发送的消息 + List> pendingMessages = jdbcTemplate.queryForList( + "SELECT id, content, type, sender, retry_count FROM message_table WHERE status = ? ORDER BY created_at ASC LIMIT 100", + MessageStatus.PENDING.name() + ); + + if (pendingMessages.isEmpty()) { + log.debug("没有待发送的消息"); + return; + } + + log.info("发现 {} 条待发送消息", pendingMessages.size()); + + for (Map messageRecord : pendingMessages) { + try { + String messageId = (String) messageRecord.get("id"); + String content = (String) messageRecord.get("content"); + String type = (String) messageRecord.get("type"); + String sender = (String) messageRecord.get("sender"); + + // 构建消息对象 + Message message = new Message(); + message.setId(messageId); + message.setContent(content); + message.setType(type); + message.setSender(sender); + message.setTimestamp(LocalDateTime.now()); + + // 发送到Redis Stream + sendMessageToRedisStream(message); + + // 更新消息状态为已发送 + updateMessageStatus(messageId, MessageStatus.SENT, null); + + log.info("消息发送成功: messageId={}", messageId); + + } catch (Exception e) { + String messageId = (String) messageRecord.get("id"); + log.error("发送消息失败: messageId={}, error={}", messageId, e.getMessage(), e); + + // 更新消息状态为失败 + updateMessageStatus(messageId, MessageStatus.FAILED, e.getMessage()); + } + } + + } catch (Exception e) { + log.error("处理待发送消息失败: {}", e.getMessage(), e); + } + } + + /** + * 重试失败的消息 + */ + @Async + @Scheduled(fixedDelay = 30000) // 每30秒执行一次 + @ConditionalOnProperty(name = "redis.transactional.message-table.scheduled-retry-enabled", havingValue = "true", matchIfMissing = false) + public void retryFailedMessages() { + try { + log.debug("开始重试失败的消息"); + + // 查询失败的消息(重试次数小于3次) + List> failedMessages = jdbcTemplate.queryForList( + "SELECT id, content, type, sender, retry_count FROM message_table WHERE status = ? AND retry_count < 3 ORDER BY updated_at ASC LIMIT 50", + MessageStatus.FAILED.name() + ); + + if (failedMessages.isEmpty()) { + log.debug("没有需要重试的消息"); + return; + } + + log.info("发现 {} 条需要重试的消息", failedMessages.size()); + + for (Map messageRecord : failedMessages) { + try { + String messageId = (String) messageRecord.get("id"); + String content = (String) messageRecord.get("content"); + String type = (String) messageRecord.get("type"); + String sender = (String) messageRecord.get("sender"); + Integer retryCount = (Integer) messageRecord.get("retry_count"); + + // 更新状态为重试中 + updateMessageStatus(messageId, MessageStatus.RETRYING, null); + + // 构建消息对象 + Message message = new Message(); + message.setId(messageId); + message.setContent(content); + message.setType(type); + message.setSender(sender); + message.setTimestamp(LocalDateTime.now()); + + // 发送到Redis Stream + sendMessageToRedisStream(message); + + // 更新消息状态为已发送 + updateMessageStatus(messageId, MessageStatus.SENT, null); + + log.info("重试消息发送成功: messageId={}, retryCount={}", messageId, retryCount + 1); + + } catch (Exception e) { + String messageId = (String) messageRecord.get("id"); + Integer retryCount = (Integer) messageRecord.get("retry_count"); + + log.error("重试消息发送失败: messageId={}, retryCount={}, error={}", + messageId, retryCount, e.getMessage(), e); + + // 增加重试次数 + incrementRetryCount(messageId); + + // 如果重试次数达到上限,标记为死信 + if (retryCount + 1 >= 3) { + updateMessageStatus(messageId, MessageStatus.DEAD_LETTER, "重试次数超限"); + log.warn("消息重试次数超限,标记为死信: messageId={}", messageId); + } + } + } + + } catch (Exception e) { + log.error("重试失败消息失败: {}", e.getMessage(), e); + } + } + + /** + * 发送消息到Redis Stream + */ + private void sendMessageToRedisStream(Message message) { + try { + Map messageMap = new HashMap<>(); + messageMap.put("id", message.getId()); + messageMap.put("content", message.getContent()); + messageMap.put("type", message.getType()); + messageMap.put("sender", message.getSender()); + messageMap.put("timestamp", message.getTimestamp().toString()); + messageMap.put("source", "message_table"); + + redisTemplate.opsForStream().add(redisStreamProperties.getKey(), messageMap); + + log.debug("消息已发送到Redis Stream: messageId={}", message.getId()); + + } catch (Exception e) { + log.error("发送消息到Redis Stream失败: messageId={}, error={}", message.getId(), e.getMessage(), e); + throw new RuntimeException("发送消息到Redis Stream失败", e); + } + } + + /** + * 更新消息状态 + */ + private void updateMessageStatus(String messageId, MessageStatus status, String errorMessage) { + try { + jdbcTemplate.update( + "UPDATE message_table SET status = ?, error_message = ?, updated_at = ? WHERE id = ?", + status.name(), errorMessage, LocalDateTime.now(), messageId + ); + + log.debug("消息状态更新成功: messageId={}, status={}", messageId, status); + + } catch (Exception e) { + log.error("更新消息状态失败: messageId={}, status={}, error={}", + messageId, status, e.getMessage(), e); + } + } + + /** + * 增加重试次数 + */ + private void incrementRetryCount(String messageId) { + try { + jdbcTemplate.update( + "UPDATE message_table SET retry_count = retry_count + 1, updated_at = ? WHERE id = ?", + LocalDateTime.now(), messageId + ); + + log.debug("重试次数增加成功: messageId={}", messageId); + + } catch (Exception e) { + log.error("增加重试次数失败: messageId={}, error={}", messageId, e.getMessage(), e); + } + } + + /** + * 获取消息统计信息 + */ + public Map getMessageTableStats() { + Map stats = new HashMap<>(); + + try { + // 统计各状态的消息数量 + Map statusCounts = new HashMap<>(); + for (MessageStatus status : MessageStatus.values()) { + Integer count = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table WHERE status = ?", + Integer.class, status.name() + ); + statusCounts.put(status.name(), count); + } + stats.put("statusCounts", statusCounts); + + // 统计总消息数 + Integer totalMessages = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table", + Integer.class + ); + stats.put("totalMessages", totalMessages); + + // 统计业务数据数量 + Integer businessDataCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM business_data", + Integer.class + ); + stats.put("businessDataCount", businessDataCount); + + // 统计今日消息数 + Integer todayMessages = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table WHERE DATE(created_at) = CURDATE()", + Integer.class + ); + stats.put("todayMessages", todayMessages); + + stats.put("success", true); + + } catch (Exception e) { + stats.put("success", false); + stats.put("error", e.getMessage()); + log.error("获取消息表统计信息失败", e); + } + + return stats; + } + + /** + * 清理死信消息 + */ + public int cleanupDeadLetterMessages() { + try { + int deletedCount = jdbcTemplate.update( + "DELETE FROM message_table WHERE status = ? AND updated_at < ?", + MessageStatus.DEAD_LETTER.name(), LocalDateTime.now().minusDays(7) + ); + + log.info("清理死信消息完成: 删除 {} 条", deletedCount); + return deletedCount; + + } catch (Exception e) { + log.error("清理死信消息失败: {}", e.getMessage(), e); + return 0; + } + } + + /** + * 手动重试消息 + */ + public boolean retryMessage(String messageId) { + try { + // 检查消息是否存在且状态为失败 + Integer count = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table WHERE id = ? AND status = ?", + Integer.class, messageId, MessageStatus.FAILED.name() + ); + + if (count == 0) { + log.warn("消息不存在或状态不是失败: messageId={}", messageId); + return false; + } + + // 更新状态为重试中 + updateMessageStatus(messageId, MessageStatus.RETRYING, null); + + // 异步发送消息 + processPendingMessages(); + + log.info("手动重试消息: messageId={}", messageId); + return true; + + } catch (Exception e) { + log.error("手动重试消息失败: messageId={}, error={}", messageId, e.getMessage(), e); + return false; + } + } +} + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f7044a7..5a09cc0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,10 +1,50 @@ server: - port: 8080 + port: 8081 spring: application: name: redis-stream-demo + # 数据库配置 + datasource: + url: jdbc:h2:mem:testdb + driver-class-name: org.h2.Driver + username: sa + password: + hikari: + maximum-pool-size: 10 + minimum-idle: 5 + connection-timeout: 30000 + idle-timeout: 600000 + max-lifetime: 1800000 + + # H2数据库控制台 + h2: + console: + enabled: true + path: /h2-console + + # JPA配置 + jpa: + hibernate: + ddl-auto: create-drop + show-sql: true + properties: + hibernate: + format_sql: true + + # 异步任务配置 + task: + execution: + pool: + core-size: 5 + max-size: 10 + queue-capacity: 100 + keep-alive: 60s + scheduling: + pool: + size: 5 + redis: host: localhost port: 6379 @@ -25,6 +65,17 @@ redis: consumer-group: "message-consumer-group" consumer-name: "message-consumer" + # 事务消息配置 + transactional: + # 消息表模式配置 + message-table: + # 是否启用消息表模式事务消息服务(默认关闭) + enabled: false + # 是否启用定时任务处理待发送消息(默认关闭) + scheduled-processing-enabled: false + # 是否启用定时任务重试失败消息(默认关闭) + scheduled-retry-enabled: false + # 消费者模式配置 consumer: # 默认模式: stream-listener, manual-ack, both @@ -62,9 +113,17 @@ redis: # 最大并发数 max-concurrency: 5 +# Application Configuration +app: + # 是否在启动时运行事务消息示例(默认关闭) + run-examples-on-startup: false + # Logging Configuration logging: level: com.example: DEBUG org.springframework.data.redis: DEBUG io.lettuce: DEBUG + charset: + console: UTF-8 + file: UTF-8 diff --git a/src/main/resources/sql/schema.sql b/src/main/resources/sql/schema.sql new file mode 100644 index 0000000..6e111d7 --- /dev/null +++ b/src/main/resources/sql/schema.sql @@ -0,0 +1,56 @@ +-- 消息表模式事务消息数据库表结构 + +-- 业务数据表 +CREATE TABLE IF NOT EXISTS business_data ( + id VARCHAR(36) PRIMARY KEY COMMENT '业务数据ID', + data TEXT NOT NULL COMMENT '业务数据内容', + status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '业务数据状态', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_status (status), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='业务数据表'; + +-- 消息表 +CREATE TABLE IF NOT EXISTS message_table ( + id VARCHAR(36) PRIMARY KEY COMMENT '消息ID', + content TEXT NOT NULL COMMENT '消息内容', + type VARCHAR(50) NOT NULL COMMENT '消息类型', + sender VARCHAR(100) NOT NULL COMMENT '发送者', + status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '消息状态', + retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', + error_message TEXT COMMENT '错误信息', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_status (status), + INDEX idx_created_at (created_at), + INDEX idx_updated_at (updated_at), + INDEX idx_retry_count (retry_count) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息表'; + +-- 死信队列表 +CREATE TABLE IF NOT EXISTS dead_letter_queue ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '死信ID', + message_id VARCHAR(36) NOT NULL COMMENT '原始消息ID', + content TEXT NOT NULL COMMENT '消息内容', + type VARCHAR(50) NOT NULL COMMENT '消息类型', + sender VARCHAR(100) NOT NULL COMMENT '发送者', + error_message TEXT COMMENT '错误信息', + retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + INDEX idx_message_id (message_id), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='死信队列表'; + +-- 消息发送日志表 +CREATE TABLE IF NOT EXISTS message_send_log ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '日志ID', + message_id VARCHAR(36) NOT NULL COMMENT '消息ID', + action VARCHAR(50) NOT NULL COMMENT '操作类型', + status VARCHAR(20) NOT NULL COMMENT '操作状态', + error_message TEXT COMMENT '错误信息', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + INDEX idx_message_id (message_id), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息发送日志表'; + diff --git a/src/test/java/com/example/service/TransactionalMessageTest.java b/src/test/java/com/example/service/TransactionalMessageTest.java new file mode 100644 index 0000000..8fc9254 --- /dev/null +++ b/src/test/java/com/example/service/TransactionalMessageTest.java @@ -0,0 +1,408 @@ +package com.example.service; + +import com.example.config.RedisStreamProperties; +import com.example.model.Message; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * 事务消息测试类 + */ +@SpringBootTest +@ActiveProfiles("test") +class TransactionalMessageTest { + + @Autowired + private TransactionalMessageProducerService transactionalMessageProducerService; + + @Autowired + private LuaScriptTransactionalService luaScriptTransactionalService; + + @Autowired + private MessageTableTransactionalService messageTableTransactionalService; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private RedisStreamProperties redisStreamProperties; + + @BeforeEach + void setUp() { + // 清理Redis数据 + redisTemplate.delete(redisStreamProperties.getKey()); + redisTemplate.delete("message:prepare"); + redisTemplate.delete(redisTemplate.keys("business:*")); + redisTemplate.delete(redisTemplate.keys("message:last:*")); + redisTemplate.delete("message:count"); + + // 清理数据库数据 + jdbcTemplate.update("DELETE FROM message_table"); + jdbcTemplate.update("DELETE FROM business_data"); + jdbcTemplate.update("DELETE FROM dead_letter_queue"); + jdbcTemplate.update("DELETE FROM message_send_log"); + } + + @Test + void testMultiExecTransactionalMessage() { + // 测试MULTI/EXEC事务消息 + Message message = createTestMessage("MULTI/EXEC测试消息"); + + String messageId = transactionalMessageProducerService.sendMessageWithMultiExec(message); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(1L, streamLength); + + // 验证统计信息已更新 + Object messageCount = redisTemplate.opsForValue().get("message:count"); + assertEquals(1L, messageCount); + + System.out.println("✅ MULTI/EXEC事务消息测试通过"); + } + + @Test + void testLuaScriptTransactionalMessage() { + // 测试Lua脚本事务消息 + Message message = createTestMessage("Lua脚本测试消息"); + + String messageId = transactionalMessageProducerService.sendMessageWithLuaScript(message); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(1L, streamLength); + + System.out.println("✅ Lua脚本事务消息测试通过"); + } + + @Test + void testMessageTableTransactionalMessage() { + // 测试消息表模式事务消息 + Message message = createTestMessage("消息表测试消息"); + String businessData = "业务数据-" + UUID.randomUUID().toString(); + + String messageId = transactionalMessageProducerService.sendMessageWithMessageTable(message, businessData); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证业务数据已插入 + Integer businessDataCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM business_data WHERE id = ?", + Integer.class, messageId + ); + assertEquals(1, businessDataCount); + + // 验证消息已记录到消息表 + Integer messageCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table WHERE id = ?", + Integer.class, messageId + ); + assertEquals(1, messageCount); + + System.out.println("✅ 消息表模式事务消息测试通过"); + } + + @Test + void testTwoPhaseCommitTransactionalMessage() { + // 测试两阶段提交事务消息 + Message message = createTestMessage("两阶段提交测试消息"); + + String messageId = transactionalMessageProducerService.sendMessageWithTwoPhaseCommit(message); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(1L, streamLength); + + System.out.println("✅ 两阶段提交事务消息测试通过"); + } + + @Test + void testLuaScriptBasicSend() { + // 测试基础Lua脚本发送 + Message message = createTestMessage("基础Lua脚本测试消息"); + + String messageId = luaScriptTransactionalService.sendMessageWithBasicScript(message); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(1L, streamLength); + + System.out.println("✅ 基础Lua脚本发送测试通过"); + } + + @Test + void testLuaScriptConditionalSend() { + // 测试条件Lua脚本发送 + Message message = createTestMessage("条件Lua脚本测试消息"); + String conditionKey = "test:condition"; + String expectedValue = "test_value"; + + // 设置条件 + redisTemplate.opsForValue().set(conditionKey, expectedValue); + + String messageId = luaScriptTransactionalService.sendMessageWithCondition( + message, conditionKey, expectedValue + ); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(1L, streamLength); + + System.out.println("✅ 条件Lua脚本发送测试通过"); + } + + @Test + void testLuaScriptConditionalSendWithWrongCondition() { + // 测试条件不满足的情况 + Message message = createTestMessage("条件不满足测试消息"); + String conditionKey = "test:condition"; + String expectedValue = "test_value"; + String wrongValue = "wrong_value"; + + // 设置错误的条件 + redisTemplate.opsForValue().set(conditionKey, wrongValue); + + String messageId = luaScriptTransactionalService.sendMessageWithCondition( + message, conditionKey, expectedValue + ); + + // 条件不满足,应该返回null + assertNull(messageId); + + // 验证消息未发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(0L, streamLength); + + System.out.println("✅ 条件不满足Lua脚本发送测试通过"); + } + + @Test + void testLuaScriptBatchSend() { + // 测试批量Lua脚本发送 + List messages = Arrays.asList( + createTestMessage("批量消息1"), + createTestMessage("批量消息2"), + createTestMessage("批量消息3") + ); + + List results = luaScriptTransactionalService.sendBatchMessagesWithScript(messages); + + assertNotNull(results); + assertEquals(3, results.size()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(3L, streamLength); + + System.out.println("✅ 批量Lua脚本发送测试通过"); + } + + @Test + void testLuaScriptTransactionalSend() { + // 测试事务Lua脚本发送 + Message message = createTestMessage("事务Lua脚本测试消息"); + String businessData = "业务数据-" + UUID.randomUUID().toString(); + + // 创建业务数据 + String businessKey = "business:" + message.getId(); + redisTemplate.opsForHash().putAll(businessKey, Map.of( + "data", businessData, + "status", "PENDING" + )); + + String messageId = luaScriptTransactionalService.sendTransactionalMessage(message, businessData); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals(1L, streamLength); + + System.out.println("✅ 事务Lua脚本发送测试通过"); + } + + @Test + @Transactional + void testMessageTableTransactionalSend() { + // 测试消息表模式事务发送 + Message message = createTestMessage("消息表事务测试消息"); + String businessData = "业务数据-" + UUID.randomUUID().toString(); + + String messageId = messageTableTransactionalService.sendTransactionalMessage(message, businessData); + + assertNotNull(messageId); + assertFalse(messageId.isEmpty()); + + // 验证业务数据已插入 + Integer businessDataCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM business_data WHERE id = ?", + Integer.class, messageId + ); + assertEquals(1, businessDataCount); + + // 验证消息已记录到消息表 + Integer messageCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table WHERE id = ?", + Integer.class, messageId + ); + assertEquals(1, messageCount); + + System.out.println("✅ 消息表模式事务发送测试通过"); + } + + @Test + @Transactional + void testMessageTableBatchSend() { + // 测试消息表模式批量发送 + List messages = Arrays.asList( + createTestMessage("批量消息1"), + createTestMessage("批量消息2"), + createTestMessage("批量消息3") + ); + + List businessDataList = Arrays.asList( + "业务数据1", + "业务数据2", + "业务数据3" + ); + + List messageIds = messageTableTransactionalService.sendBatchTransactionalMessages( + messages, businessDataList + ); + + assertNotNull(messageIds); + assertEquals(3, messageIds.size()); + + // 验证所有消息都已记录 + Integer messageCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM message_table", + Integer.class + ); + assertEquals(3, messageCount); + + System.out.println("✅ 消息表模式批量发送测试通过"); + } + + @Test + void testBatchTransactionalMessages() { + // 测试批量事务消息发送 + int count = 10; + String transactionType = "multiexec"; + + int successCount = transactionalMessageProducerService.sendBatchTransactionalMessages( + count, transactionType + ); + + assertEquals(count, successCount); + + // 验证消息已发送到Redis Stream + Long streamLength = redisTemplate.opsForStream().size(redisStreamProperties.getKey()); + assertEquals((long) count, streamLength); + + System.out.println("✅ 批量事务消息发送测试通过"); + } + + @Test + void testGetTransactionalMessageStats() { + // 测试获取事务消息统计信息 + Map stats = transactionalMessageProducerService.getTransactionalMessageStats(); + + assertNotNull(stats); + assertTrue((Boolean) stats.get("success")); + + System.out.println("✅ 事务消息统计信息测试通过"); + } + + @Test + void testGetLuaScriptStats() { + // 测试获取Lua脚本统计信息 + Map stats = luaScriptTransactionalService.getLuaScriptStats(); + + assertNotNull(stats); + assertTrue((Boolean) stats.get("success")); + + System.out.println("✅ Lua脚本统计信息测试通过"); + } + + @Test + void testGetMessageTableStats() { + // 测试获取消息表统计信息 + Map stats = messageTableTransactionalService.getMessageTableStats(); + + assertNotNull(stats); + assertTrue((Boolean) stats.get("success")); + + System.out.println("✅ 消息表统计信息测试通过"); + } + + @Test + void testCleanupBusinessData() { + // 测试清理业务数据 + luaScriptTransactionalService.cleanupBusinessData(); + + // 验证业务数据已清理 + long businessDataCount = redisTemplate.keys("business:*").size(); + assertEquals(0L, businessDataCount); + + System.out.println("✅ 业务数据清理测试通过"); + } + + @Test + void testCleanupDeadLetterMessages() { + // 测试清理死信消息 + int deletedCount = messageTableTransactionalService.cleanupDeadLetterMessages(); + + // 由于没有死信消息,应该返回0 + assertEquals(0, deletedCount); + + System.out.println("✅ 死信消息清理测试通过"); + } + + /** + * 创建测试消息 + */ + private Message createTestMessage(String content) { + Message message = new Message(); + message.setId(UUID.randomUUID().toString()); + message.setContent(content); + message.setType("TEST"); + message.setSender("test-sender"); + message.setTimestamp(LocalDateTime.now()); + return message; + } +} + diff --git a/target/classes/application-test.yml b/target/classes/application-test.yml new file mode 100644 index 0000000..105c810 --- /dev/null +++ b/target/classes/application-test.yml @@ -0,0 +1,42 @@ +server: + port: 8080 + +spring: + application: + name: redis-stream-demo + + redis: + host: localhost + port: 6379 + password: + database: 0 + timeout: 10000ms # 增加超时时间到10秒 + lettuce: + pool: + max-active: 8 + max-wait: 5000ms # 设置合理的等待时间 + max-idle: 8 + min-idle: 0 + +# Redis Stream Configuration +redis: + stream: + key: "message-stream" + consumer-group: "message-consumer-group" + consumer-name: "message-consumer" + + # 消费者模式配置 + consumer: + # 默认模式: stream-listener, manual-ack, both + default-mode: "both" + # 是否启用 StreamListener 模式 + stream-listener-enabled: false + # 是否启用 Manual Ack 模式 + manual-ack-enabled: false + +# Logging Configuration +logging: + level: + com.example: DEBUG + org.springframework.data.redis: DEBUG + io.lettuce: DEBUG diff --git a/target/classes/application.yml b/target/classes/application.yml new file mode 100644 index 0000000..5a09cc0 --- /dev/null +++ b/target/classes/application.yml @@ -0,0 +1,129 @@ +server: + port: 8081 + +spring: + application: + name: redis-stream-demo + + # 数据库配置 + datasource: + url: jdbc:h2:mem:testdb + driver-class-name: org.h2.Driver + username: sa + password: + hikari: + maximum-pool-size: 10 + minimum-idle: 5 + connection-timeout: 30000 + idle-timeout: 600000 + max-lifetime: 1800000 + + # H2数据库控制台 + h2: + console: + enabled: true + path: /h2-console + + # JPA配置 + jpa: + hibernate: + ddl-auto: create-drop + show-sql: true + properties: + hibernate: + format_sql: true + + # 异步任务配置 + task: + execution: + pool: + core-size: 5 + max-size: 10 + queue-capacity: 100 + keep-alive: 60s + scheduling: + pool: + size: 5 + + redis: + host: localhost + port: 6379 + password: + database: 0 + timeout: 2000ms + lettuce: + pool: + max-active: 8 + max-wait: -1ms + max-idle: 8 + min-idle: 0 + +# Redis Stream Configuration +redis: + stream: + key: "message-stream" + consumer-group: "message-consumer-group" + consumer-name: "message-consumer" + + # 事务消息配置 + transactional: + # 消息表模式配置 + message-table: + # 是否启用消息表模式事务消息服务(默认关闭) + enabled: false + # 是否启用定时任务处理待发送消息(默认关闭) + scheduled-processing-enabled: false + # 是否启用定时任务重试失败消息(默认关闭) + scheduled-retry-enabled: false + + # 消费者模式配置 + consumer: + # 默认模式: stream-listener, manual-ack, both + default-mode: "both" + # 是否启用 StreamListener 模式 + stream-listener-enabled: true + # 是否启用 Manual Ack 模式 + manual-ack-enabled: true + + # StreamListener 配置 + stream-listener: + # 是否自动启动 + auto-start: false + # 是否处理历史消息(true: 从开头读取所有消息,false: 只读取新消息) + process-historical-messages: true + # 轮询超时时间(秒) + poll-timeout: 1 + # 线程池核心线程数 + core-pool-size: 2 + # 线程池最大线程数 + max-pool-size: 4 + # 线程空闲时间(秒) + keep-alive-time: 60 + + # Manual Ack 配置 + manual-ack: + # 默认批量大小 + default-batch-size: 10 + # 最大批量大小 + max-batch-size: 100 + # 轮询间隔(毫秒) + poll-interval: 1000 + # 是否启用并发处理 + concurrent-processing: false + # 最大并发数 + max-concurrency: 5 + +# Application Configuration +app: + # 是否在启动时运行事务消息示例(默认关闭) + run-examples-on-startup: false + +# Logging Configuration +logging: + level: + com.example: DEBUG + org.springframework.data.redis: DEBUG + io.lettuce: DEBUG + charset: + console: UTF-8 + file: UTF-8 diff --git a/target/classes/sql/schema.sql b/target/classes/sql/schema.sql new file mode 100644 index 0000000..6e111d7 --- /dev/null +++ b/target/classes/sql/schema.sql @@ -0,0 +1,56 @@ +-- 消息表模式事务消息数据库表结构 + +-- 业务数据表 +CREATE TABLE IF NOT EXISTS business_data ( + id VARCHAR(36) PRIMARY KEY COMMENT '业务数据ID', + data TEXT NOT NULL COMMENT '业务数据内容', + status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '业务数据状态', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_status (status), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='业务数据表'; + +-- 消息表 +CREATE TABLE IF NOT EXISTS message_table ( + id VARCHAR(36) PRIMARY KEY COMMENT '消息ID', + content TEXT NOT NULL COMMENT '消息内容', + type VARCHAR(50) NOT NULL COMMENT '消息类型', + sender VARCHAR(100) NOT NULL COMMENT '发送者', + status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '消息状态', + retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', + error_message TEXT COMMENT '错误信息', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_status (status), + INDEX idx_created_at (created_at), + INDEX idx_updated_at (updated_at), + INDEX idx_retry_count (retry_count) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息表'; + +-- 死信队列表 +CREATE TABLE IF NOT EXISTS dead_letter_queue ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '死信ID', + message_id VARCHAR(36) NOT NULL COMMENT '原始消息ID', + content TEXT NOT NULL COMMENT '消息内容', + type VARCHAR(50) NOT NULL COMMENT '消息类型', + sender VARCHAR(100) NOT NULL COMMENT '发送者', + error_message TEXT COMMENT '错误信息', + retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + INDEX idx_message_id (message_id), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='死信队列表'; + +-- 消息发送日志表 +CREATE TABLE IF NOT EXISTS message_send_log ( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '日志ID', + message_id VARCHAR(36) NOT NULL COMMENT '消息ID', + action VARCHAR(50) NOT NULL COMMENT '操作类型', + status VARCHAR(20) NOT NULL COMMENT '操作状态', + error_message TEXT COMMENT '错误信息', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + INDEX idx_message_id (message_id), + INDEX idx_created_at (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息发送日志表'; + diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 0000000..a891190 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,30 @@ +com\example\config\ConsumerModeConfig$StreamListenerConfig.class +com\example\config\RedisStreamConfig.class +com\example\service\ManualAckConsumerService.class +com\example\controller\TransactionalMessageController.class +com\example\service\TransactionalMessageProducerService.class +com\example\config\RedisStreamProperties$MessageTableConfig.class +com\example\config\RedisStreamProperties$TransactionalConfig.class +com\example\controller\TransactionalMessageController$MessageTableRequest.class +com\example\model\Message.class +com\example\service\MessageProducerService.class +com\example\service\MessageTableTransactionalService.class +com\example\controller\TransactionalMessageController$ConditionalMessageRequest.class +com\example\service\StreamListenerConsumerService.class +com\example\example\TransactionalMessageUsageExample.class +com\example\controller\TransactionalMessageController$MessageRequest.class +com\example\example\ConsumerUsageExample.class +com\example\RedisStreamApplication.class +com\example\config\ConsumerModeConfig$ManualAckConfig.class +com\example\config\ConsumerModeConfig.class +com\example\controller\TransactionalMessageController$BatchMessageRequest.class +com\example\service\MessageTableTransactionalService$MessageStatus.class +com\example\controller\TransactionalMessageController$BatchMessageTableRequest.class +com\example\service\LuaScriptTransactionalService.class +com\example\controller\TransactionalMessageController$TransactionalLuaMessageRequest.class +com\example\service\TransactionalMessageProducerService$1.class +com\example\config\RedisStreamProperties.class +com\example\controller\TransactionalMessageController$BatchLuaMessageRequest.class +com\example\config\RedisStreamProperties$StreamListenerConfig.class +com\example\config\AsyncConfig.class +com\example\controller\StreamController.class diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 0000000..cc8d1bd --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1,16 @@ +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\config\AsyncConfig.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\service\StreamListenerConsumerService.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\service\TransactionalMessageProducerService.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\service\MessageProducerService.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\controller\StreamController.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\service\LuaScriptTransactionalService.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\service\ManualAckConsumerService.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\config\ConsumerModeConfig.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\example\TransactionalMessageUsageExample.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\config\RedisStreamConfig.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\example\ConsumerUsageExample.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\model\Message.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\config\RedisStreamProperties.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\controller\TransactionalMessageController.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\RedisStreamApplication.java +E:\IdeaProject\spring-boot-starter-data-redis\src\main\java\com\example\service\MessageTableTransactionalService.java diff --git a/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst new file mode 100644 index 0000000..dfc5a32 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst @@ -0,0 +1,3 @@ +com\example\RedisStreamIntegrationTest.class +com\example\service\MessageProducerServiceTest.class +com\example\service\TransactionalMessageTest.class diff --git a/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst new file mode 100644 index 0000000..7906e45 --- /dev/null +++ b/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst @@ -0,0 +1,3 @@ +E:\IdeaProject\spring-boot-starter-data-redis\src\test\java\com\example\service\MessageProducerServiceTest.java +E:\IdeaProject\spring-boot-starter-data-redis\src\test\java\com\example\service\TransactionalMessageTest.java +E:\IdeaProject\spring-boot-starter-data-redis\src\test\java\com\example\RedisStreamIntegrationTest.java