From 7a1da5b9ab81fd43342c77bc849b94082639c43d Mon Sep 17 00:00:00 2001 From: "a15234804788@163.com" Date: Thu, 13 Oct 2022 18:13:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A2=E7=B4=A2=E9=80=9A=E8=BF=87redisson?= =?UTF-8?q?=E7=9A=84=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E9=80=9A=E7=9F=A5=E5=BC=80=E4=BC=9A=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{ => user}/config/ScopeConfiguration.java | 2 +- .../system/user/config/RedissonConfig.java | 45 +++++++++ .../mettinghandler/MettingEndHandler.java | 27 ++++++ .../mettinghandler/MettingRemindHandler.java | 48 ++++++++++ .../RedissonDelayQueuHandlle.java | 6 ++ .../user/service/impl/TrainServiceImpl.java | 42 +++++---- .../system/user/test/TestUserController.java | 19 +++- .../system/user/util/QueueEnum.java | 27 ++++++ .../user/util/RedisDelayQueueRunner.java | 60 ++++++++++++ .../system/user/util/RedisDelayQueueUtil.java | 91 +++++++++++++++++++ 10 files changed, 346 insertions(+), 21 deletions(-) rename lab-service-api/lab-scope-api/src/main/java/org/springblade/system/{ => user}/config/ScopeConfiguration.java (95%) create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/config/RedissonConfig.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingEndHandler.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingRemindHandler.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/RedissonDelayQueuHandlle.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/util/QueueEnum.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueRunner.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueUtil.java diff --git a/lab-service-api/lab-scope-api/src/main/java/org/springblade/system/config/ScopeConfiguration.java b/lab-service-api/lab-scope-api/src/main/java/org/springblade/system/user/config/ScopeConfiguration.java similarity index 95% rename from lab-service-api/lab-scope-api/src/main/java/org/springblade/system/config/ScopeConfiguration.java rename to lab-service-api/lab-scope-api/src/main/java/org/springblade/system/user/config/ScopeConfiguration.java index 2191599..e7f7254 100644 --- a/lab-service-api/lab-scope-api/src/main/java/org/springblade/system/config/ScopeConfiguration.java +++ b/lab-service-api/lab-scope-api/src/main/java/org/springblade/system/user/config/ScopeConfiguration.java @@ -1,5 +1,5 @@ -package org.springblade.system.config; +package org.springblade.system.user.config; import lombok.AllArgsConstructor; diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/config/RedissonConfig.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/config/RedissonConfig.java new file mode 100644 index 0000000..01ea3ea --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/config/RedissonConfig.java @@ -0,0 +1,45 @@ +package org.springblade.system.user.config; + +import lombok.extern.slf4j.Slf4j; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * redisson配置类 + * @author ytl + * @since 2022-10-13 15:11 + */ +@Slf4j +@Configuration +public class RedissonConfig { + @Value("${spring.redis.host}") + private String host; + + @Value("${spring.redis.port}") + private String port; + + @Value("${spring.redis.password}") + private String password; + + @Bean + public RedissonClient redissonClient(){ + Config config = new Config(); + + config.useSingleServer() + .setAddress("redis://" + host + ":" + port) + .setDatabase(0) + .setPingConnectionInterval(2000); + config.setLockWatchdogTimeout(10000L); + try{ + return Redisson.create(config); + }catch (Exception e){ + log.error("创建redisson连接错误"); + return null; + } + } + +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingEndHandler.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingEndHandler.java new file mode 100644 index 0000000..7255063 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingEndHandler.java @@ -0,0 +1,27 @@ +package org.springblade.system.user.mettinghandler; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import lombok.extern.slf4j.Slf4j; +import org.springblade.system.user.entity.Train; +import org.springblade.system.user.service.ITrainService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author ytl + * @since 2022-10-13 16:11 + */ +@Slf4j +@Component("mettingEndHandler") +public class MettingEndHandler implements RedissonDelayQueuHandlle { + @Autowired + private ITrainService trainService; + + @Override + public void execute(String id) { + LambdaUpdateWrapper wrapper = new LambdaUpdateWrapper(); + wrapper.eq(Train::getId, id).set(Train::getStatus, 2); + trainService.update(wrapper); + log.info("有会议结束了,会议id:{}", id); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingRemindHandler.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingRemindHandler.java new file mode 100644 index 0000000..856d414 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingRemindHandler.java @@ -0,0 +1,48 @@ +package org.springblade.system.user.mettinghandler; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; +import org.springblade.resource.enums.SysTypeEnum; +import org.springblade.resource.feign.IMessageClient; +import org.springblade.system.user.entity.Train; +import org.springblade.system.user.entity.TrainPerson; +import org.springblade.system.user.service.ITrainPersonService; +import org.springblade.system.user.service.ITrainService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author ytl + * @since 2022-10-13 16:09 + */ +@Slf4j +@Component("mettingRemindHandler") +public class MettingRemindHandler implements RedissonDelayQueuHandlle{ + @Autowired + private ITrainService trainService; + + @Autowired + private ITrainPersonService trainPersonService; + + @Autowired + private IMessageClient messageClient; + + @Override + public void execute(String id) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(TrainPerson::getTrainId, id); + List list = trainPersonService.list(wrapper); + Train train = trainService.getById(id); +// if (list.size() > 0) { +// list.forEach(person -> { +// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒", +// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, person.getPersonName(), "/train/project"); +// }); +// } +// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒", +// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, train.getTeacherName(), "/train/project"); + log.info("有会议即将开始,会议id:{}", id); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/RedissonDelayQueuHandlle.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/RedissonDelayQueuHandlle.java new file mode 100644 index 0000000..c8be842 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/RedissonDelayQueuHandlle.java @@ -0,0 +1,6 @@ +package org.springblade.system.user.mettinghandler; + +public interface RedissonDelayQueuHandlle { + void execute(T t); + +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/service/impl/TrainServiceImpl.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/service/impl/TrainServiceImpl.java index 0ec563d..fd308f5 100644 --- a/lab-service/lab-user/src/main/java/org/springblade/system/user/service/impl/TrainServiceImpl.java +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/service/impl/TrainServiceImpl.java @@ -17,6 +17,8 @@ import org.springblade.system.user.service.ITeacherService; import org.springblade.system.user.service.ITrainPersonService; import org.springblade.system.user.service.ITrainService; import org.springblade.system.user.service.ITrainSpeakService; +import org.springblade.system.user.util.QueueEnum; +import org.springblade.system.user.util.RedisDelayQueueUtil; import org.springblade.system.user.util.ThreadPoolFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; @@ -55,6 +57,9 @@ public class TrainServiceImpl extends BaseServiceImpl implem private final ISysClient sysClient; + @Autowired + private RedisDelayQueueUtil redisDelayQueueUtil; + @Autowired private StringRedisTemplate redisTemplate; @@ -137,27 +142,30 @@ public class TrainServiceImpl extends BaseServiceImpl implem } //异步执行提前通知参会人员的任务 - ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() { - @Override - public void run() { - for (String s : split) { - messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒", - "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, s, "/train/project"); - } - } - },time1 - now, TimeUnit.MILLISECONDS); +// ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() { +// @Override +// public void run() { +// for (String s : split) { +// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒", +// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, s, "/train/project"); +// } +// } +// },time1 - now, TimeUnit.MILLISECONDS); + //提前通知参会人员和讲师的替代方案 + redisDelayQueueUtil.addDelayQueue(train.getId(),time1 - now, TimeUnit.MILLISECONDS, QueueEnum.METTING_REMAIND.getCode()); //异步执行提前通知讲师的任务 - ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() { - @Override - public void run() { - messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒", - "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, train.getTeacher().toString(), "/train/project"); - } - },time1 - now, TimeUnit.MILLISECONDS); +// ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() { +// @Override +// public void run() { +// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒", +// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, train.getTeacher().toString(), "/train/project"); +// } +// },time1 - now, TimeUnit.MILLISECONDS); + //向redis中加入过期key --培训的id - redisTemplate.opsForValue().set(train.getId().toString(), train.getId().toString(),time1 - now , TimeUnit.MILLISECONDS); + //redisTemplate.opsForValue().set(train.getId().toString(), train.getId().toString(),time1 - now , TimeUnit.MILLISECONDS); return teacherService.updateById(teacher); } diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java index c52e53f..06d81ec 100644 --- a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java @@ -1,16 +1,17 @@ package org.springblade.system.user.test; import org.springblade.core.tool.api.R; +import org.springblade.system.user.util.QueueEnum; +import org.springblade.system.user.util.RedisDelayQueueUtil; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.logging.Handler; /** @@ -59,4 +60,16 @@ public class TestUserController { rabbitTemplate.convertAndSend("delay_exchange","delayed",message); return R.data("2"); } + + //测试redisson延迟队列添加信息 + @Autowired + private RedisDelayQueueUtil redisDelayQueueUtil; + + @GetMapping("/addQueue") + public void addQueue( String id){ + + redisDelayQueueUtil.addDelayQueue(id,5, TimeUnit.SECONDS, QueueEnum.METTING_REMAIND.getCode()); + + redisDelayQueueUtil.addDelayQueue(id,10, TimeUnit.SECONDS, QueueEnum.METTING_END.getCode()); + } } diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/util/QueueEnum.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/util/QueueEnum.java new file mode 100644 index 0000000..c35b045 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/util/QueueEnum.java @@ -0,0 +1,27 @@ +package org.springblade.system.user.util; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * @author ytl + * @since 2022-10-13 15:59 + */ + +@Getter +@NoArgsConstructor +@AllArgsConstructor +public enum QueueEnum { + METTING_REMAIND("METTING_REMAIND" ,"会议即将开始", "mettingRemindHandler"), + METTING_END("METTING_END", "会议结束了" ,"mettingEndHandler"); + + //延迟队列key + private String code; + + //中文描述 + private String name; + + //具体业务的bean + private String beanId; +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueRunner.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueRunner.java new file mode 100644 index 0000000..72a8355 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueRunner.java @@ -0,0 +1,60 @@ +package org.springblade.system.user.util; + + +import lombok.extern.slf4j.Slf4j; +import org.springblade.system.user.mettinghandler.RedissonDelayQueuHandlle; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.ApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 系统启动后自动启动,监听延迟队列 + * @author ytl + * @since 2022-10-13 15:55 + */ +@Slf4j +@Component +public class RedisDelayQueueRunner implements CommandLineRunner { + @Autowired + private RedisDelayQueueUtil redisDelayQueueUtil; + + @Autowired + private ApplicationContext context; + + //spring封装的 + @Autowired + private ThreadPoolTaskExecutor threadPoolTaskExecutor; + + @Override + public void run(String... args) throws Exception { + + threadPoolTaskExecutor.execute(() -> { + while (true) { + QueueEnum[] queueEnums = QueueEnum.values(); + for (QueueEnum en : queueEnums) { + try { + String value = (String)redisDelayQueueUtil.getDelayQueue(en.getCode()); + if (value != null){ + RedissonDelayQueuHandlle bean = (RedissonDelayQueuHandlle)context.getBean(en.getBeanId()); + bean.execute(value); + } + + } catch (Exception e) { + log.error("延迟队列监控失败"); + } + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }); + + log.info("延迟队列监听成功"); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueUtil.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueUtil.java new file mode 100644 index 0000000..b1a5e0d --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueUtil.java @@ -0,0 +1,91 @@ +package org.springblade.system.user.util; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang.StringUtils; +import org.redisson.api.RBlockingDeque; +import org.redisson.api.RDelayedQueue; +import org.redisson.api.RedissonClient; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * redisson延迟队列工具类 + * @author ytl + * @since 2022-10-13 15:24 + */ +@Slf4j +@Component +@ConditionalOnBean(RedissonClient.class) +public class RedisDelayQueueUtil { + @Resource + private RedissonClient redissonClient; + + /** + * 添加到延迟队列 + * @param value 值 + * @param delayTime 延迟时间 + * @param timeUnit 时间单位 + * @param queueCode 队列键 + * @param + * @return + */ + public boolean addDelayQueue(@NonNull T value, @NonNull long delayTime, + @NonNull TimeUnit timeUnit, @NonNull String queueCode){ + if (StringUtils.isBlank(queueCode) || Objects.isNull(value)) { + return false; + } + + try{ + RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque); + delayedQueue.offer(value, delayTime, timeUnit); + log.info("添加延时队列成功,队列键:{},值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delayTime)); + } catch (Exception e) { + log.error("添加延时队列失败,{}", e.getMessage()); + } + + return true; + } + + /** + * 获取延迟队列值 + * @param queueCode + * @param + * @return + * @throws Exception + */ + public T getDelayQueue(@NonNull String queueCode) throws Exception{ + if (StringUtils.isBlank(queueCode)) { + return null; + } + + RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque); + + T value = (T)blockingDeque.poll(); + return value; + } + + /** + * 删除指定队列中的消息 + * @param o + * @param queueCode + * @return + */ + public boolean removeDelayQueue(@NonNull Object o, @NonNull String queueCode){ + if (StringUtils.isBlank(queueCode) || Objects.isNull(o)) { + return false; + } + + RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode); + RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque); + boolean flag = delayedQueue.remove(o); + return flag; + } +}