探索通过redisson的延迟队列实现延迟通知开会的功能

dev
a15234804788@163.com 3 years ago
parent a1eb17e4fd
commit 7a1da5b9ab
  1. 2
      lab-service-api/lab-scope-api/src/main/java/org/springblade/system/user/config/ScopeConfiguration.java
  2. 45
      lab-service/lab-user/src/main/java/org/springblade/system/user/config/RedissonConfig.java
  3. 27
      lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingEndHandler.java
  4. 48
      lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/MettingRemindHandler.java
  5. 6
      lab-service/lab-user/src/main/java/org/springblade/system/user/mettinghandler/RedissonDelayQueuHandlle.java
  6. 42
      lab-service/lab-user/src/main/java/org/springblade/system/user/service/impl/TrainServiceImpl.java
  7. 19
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java
  8. 27
      lab-service/lab-user/src/main/java/org/springblade/system/user/util/QueueEnum.java
  9. 60
      lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueRunner.java
  10. 91
      lab-service/lab-user/src/main/java/org/springblade/system/user/util/RedisDelayQueueUtil.java

@ -0,0 +1,45 @@
package org.springblade.system.user.config;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* redisson配置类
* @author ytl
* @since 2022-10-13 15:11
*/
@Slf4j
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setDatabase(0)
.setPingConnectionInterval(2000);
config.setLockWatchdogTimeout(10000L);
try{
return Redisson.create(config);
}catch (Exception e){
log.error("创建redisson连接错误");
return null;
}
}
}

@ -0,0 +1,27 @@
package org.springblade.system.user.mettinghandler;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springblade.system.user.entity.Train;
import org.springblade.system.user.service.ITrainService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author ytl
* @since 2022-10-13 16:11
*/
@Slf4j
@Component("mettingEndHandler")
public class MettingEndHandler implements RedissonDelayQueuHandlle<String> {
@Autowired
private ITrainService trainService;
@Override
public void execute(String id) {
LambdaUpdateWrapper<Train> wrapper = new LambdaUpdateWrapper<Train>();
wrapper.eq(Train::getId, id).set(Train::getStatus, 2);
trainService.update(wrapper);
log.info("有会议结束了,会议id:{}", id);
}
}

@ -0,0 +1,48 @@
package org.springblade.system.user.mettinghandler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springblade.resource.enums.SysTypeEnum;
import org.springblade.resource.feign.IMessageClient;
import org.springblade.system.user.entity.Train;
import org.springblade.system.user.entity.TrainPerson;
import org.springblade.system.user.service.ITrainPersonService;
import org.springblade.system.user.service.ITrainService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author ytl
* @since 2022-10-13 16:09
*/
@Slf4j
@Component("mettingRemindHandler")
public class MettingRemindHandler implements RedissonDelayQueuHandlle<String>{
@Autowired
private ITrainService trainService;
@Autowired
private ITrainPersonService trainPersonService;
@Autowired
private IMessageClient messageClient;
@Override
public void execute(String id) {
LambdaQueryWrapper<TrainPerson> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(TrainPerson::getTrainId, id);
List<TrainPerson> list = trainPersonService.list(wrapper);
Train train = trainService.getById(id);
// if (list.size() > 0) {
// list.forEach(person -> {
// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒",
// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, person.getPersonName(), "/train/project");
// });
// }
// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒",
// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, train.getTeacherName(), "/train/project");
log.info("有会议即将开始,会议id:{}", id);
}
}

@ -0,0 +1,6 @@
package org.springblade.system.user.mettinghandler;
public interface RedissonDelayQueuHandlle<T> {
void execute(T t);
}

@ -17,6 +17,8 @@ import org.springblade.system.user.service.ITeacherService;
import org.springblade.system.user.service.ITrainPersonService;
import org.springblade.system.user.service.ITrainService;
import org.springblade.system.user.service.ITrainSpeakService;
import org.springblade.system.user.util.QueueEnum;
import org.springblade.system.user.util.RedisDelayQueueUtil;
import org.springblade.system.user.util.ThreadPoolFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
@ -55,6 +57,9 @@ public class TrainServiceImpl extends BaseServiceImpl<TrainMapper, Train> implem
private final ISysClient sysClient;
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@Autowired
private StringRedisTemplate redisTemplate;
@ -137,27 +142,30 @@ public class TrainServiceImpl extends BaseServiceImpl<TrainMapper, Train> implem
}
//异步执行提前通知参会人员的任务
ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() {
@Override
public void run() {
for (String s : split) {
messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒",
"您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, s, "/train/project");
}
}
},time1 - now, TimeUnit.MILLISECONDS);
// ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() {
// @Override
// public void run() {
// for (String s : split) {
// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒",
// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, s, "/train/project");
// }
// }
// },time1 - now, TimeUnit.MILLISECONDS);
//提前通知参会人员和讲师的替代方案
redisDelayQueueUtil.addDelayQueue(train.getId(),time1 - now, TimeUnit.MILLISECONDS, QueueEnum.METTING_REMAIND.getCode());
//异步执行提前通知讲师的任务
ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() {
@Override
public void run() {
messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒",
"您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, train.getTeacher().toString(), "/train/project");
}
},time1 - now, TimeUnit.MILLISECONDS);
// ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() {
// @Override
// public void run() {
// messageClient.event(SysTypeEnum.INFORM.getValue(), "会议提醒",
// "您有新的会议将在" + train.getDuration() + "分钟后开始,请准时参加!", 1, 5, train.getTeacher().toString(), "/train/project");
// }
// },time1 - now, TimeUnit.MILLISECONDS);
//向redis中加入过期key --培训的id
redisTemplate.opsForValue().set(train.getId().toString(), train.getId().toString(),time1 - now , TimeUnit.MILLISECONDS);
//redisTemplate.opsForValue().set(train.getId().toString(), train.getId().toString(),time1 - now , TimeUnit.MILLISECONDS);
return teacherService.updateById(teacher);
}

@ -1,16 +1,17 @@
package org.springblade.system.user.test;
import org.springblade.core.tool.api.R;
import org.springblade.system.user.util.QueueEnum;
import org.springblade.system.user.util.RedisDelayQueueUtil;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
/**
@ -59,4 +60,16 @@ public class TestUserController {
rabbitTemplate.convertAndSend("delay_exchange","delayed",message);
return R.data("2");
}
//测试redisson延迟队列添加信息
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@GetMapping("/addQueue")
public void addQueue( String id){
redisDelayQueueUtil.addDelayQueue(id,5, TimeUnit.SECONDS, QueueEnum.METTING_REMAIND.getCode());
redisDelayQueueUtil.addDelayQueue(id,10, TimeUnit.SECONDS, QueueEnum.METTING_END.getCode());
}
}

@ -0,0 +1,27 @@
package org.springblade.system.user.util;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* @author ytl
* @since 2022-10-13 15:59
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum QueueEnum {
METTING_REMAIND("METTING_REMAIND" ,"会议即将开始", "mettingRemindHandler"),
METTING_END("METTING_END", "会议结束了" ,"mettingEndHandler");
//延迟队列key
private String code;
//中文描述
private String name;
//具体业务的bean
private String beanId;
}

@ -0,0 +1,60 @@
package org.springblade.system.user.util;
import lombok.extern.slf4j.Slf4j;
import org.springblade.system.user.mettinghandler.RedissonDelayQueuHandlle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 系统启动后自动启动监听延迟队列
* @author ytl
* @since 2022-10-13 15:55
*/
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@Autowired
private ApplicationContext context;
//spring封装的
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Override
public void run(String... args) throws Exception {
threadPoolTaskExecutor.execute(() -> {
while (true) {
QueueEnum[] queueEnums = QueueEnum.values();
for (QueueEnum en : queueEnums) {
try {
String value = (String)redisDelayQueueUtil.getDelayQueue(en.getCode());
if (value != null){
RedissonDelayQueuHandlle<Object> bean = (RedissonDelayQueuHandlle<Object>)context.getBean(en.getBeanId());
bean.execute(value);
}
} catch (Exception e) {
log.error("延迟队列监控失败");
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
log.info("延迟队列监听成功");
}
}

@ -0,0 +1,91 @@
package org.springblade.system.user.util;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* redisson延迟队列工具类
* @author ytl
* @since 2022-10-13 15:24
*/
@Slf4j
@Component
@ConditionalOnBean(RedissonClient.class)
public class RedisDelayQueueUtil {
@Resource
private RedissonClient redissonClient;
/**
* 添加到延迟队列
* @param value
* @param delayTime 延迟时间
* @param timeUnit 时间单位
* @param queueCode 队列键
* @param <T>
* @return
*/
public <T> boolean addDelayQueue(@NonNull T value, @NonNull long delayTime,
@NonNull TimeUnit timeUnit, @NonNull String queueCode){
if (StringUtils.isBlank(queueCode) || Objects.isNull(value)) {
return false;
}
try{
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(value, delayTime, timeUnit);
log.info("添加延时队列成功,队列键:{},值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delayTime));
} catch (Exception e) {
log.error("添加延时队列失败,{}", e.getMessage());
}
return true;
}
/**
* 获取延迟队列值
* @param queueCode
* @param <T>
* @return
* @throws Exception
*/
public <T> T getDelayQueue(@NonNull String queueCode) throws Exception{
if (StringUtils.isBlank(queueCode)) {
return null;
}
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
T value = (T)blockingDeque.poll();
return value;
}
/**
* 删除指定队列中的消息
* @param o
* @param queueCode
* @return
*/
public boolean removeDelayQueue(@NonNull Object o, @NonNull String queueCode){
if (StringUtils.isBlank(queueCode) || Objects.isNull(o)) {
return false;
}
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
boolean flag = delayedQueue.remove(o);
return flag;
}
}
Loading…
Cancel
Save