删除掉测试的rabitmq相关代码

dev
a15234804788@163.com 3 years ago
parent e02f33380f
commit fcf9d06ed2
  1. 6
      lab-service/lab-user/pom.xml
  2. 3
      lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java
  3. 2
      lab-service/lab-user/src/main/java/org/springblade/system/user/service/impl/TrainServiceImpl.java
  4. 44
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java
  5. 35
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java
  6. 39
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java
  7. 75
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java
  8. 29
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java
  9. 8
      lab-service/lab-user/src/main/resources/application-dev.yml

@ -52,12 +52,6 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!-- springcloud stream -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies> </dependencies>

@ -4,10 +4,7 @@ package org.springblade.system.user;
import org.springblade.core.cloud.feign.EnableBladeFeign; import org.springblade.core.cloud.feign.EnableBladeFeign;
import org.springblade.core.launch.BladeApplication; import org.springblade.core.launch.BladeApplication;
import org.springblade.core.launch.constant.AppConstant; import org.springblade.core.launch.constant.AppConstant;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.client.SpringCloudApplication; import org.springframework.cloud.client.SpringCloudApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
/** /**

@ -153,6 +153,8 @@ public class TrainServiceImpl extends BaseServiceImpl<TrainMapper, Train> implem
// },time1 - now, TimeUnit.MILLISECONDS); // },time1 - now, TimeUnit.MILLISECONDS);
//提前通知参会人员和讲师的替代方案 //提前通知参会人员和讲师的替代方案
redisDelayQueueUtil.addDelayQueue(train.getId(),time1 - now, TimeUnit.MILLISECONDS, QueueEnum.METTING_REMAIND.getCode()); redisDelayQueueUtil.addDelayQueue(train.getId(),time1 - now, TimeUnit.MILLISECONDS, QueueEnum.METTING_REMAIND.getCode());
//可以通过如下方法自动将到期会议状态设置为2,需要的话放开该注释即可
// redisDelayQueueUtil.addDelayQueue(train.getId(),time - now, TimeUnit.MILLISECONDS, QueueEnum.METTING_END.getCode());
//异步执行提前通知讲师的任务 //异步执行提前通知讲师的任务
// ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() { // ThreadPoolFactory.sheduleThreadPool.schedule(new Runnable() {

@ -1,44 +0,0 @@
package org.springblade.system.user.test;
import com.rabbitmq.client.Channel;
import org.springblade.resource.entity.Message;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* 4.死信队列的消息监听器
* @author ytl
* @since 2022-10-11 14:45
*/
@Component
public class DeadMessageListener {
// @RabbitListener(bindings = @QueueBinding(
// //参数 durable exclusive autoDelete 这些默认都是false,参数可以省略
// value = @Queue(name = "bbb.Queue.dead", durable = "false", exclusive = "false", autoDelete = "false"),
// exchange = @Exchange(name = "dead.dlx.Exchange", type = ExchangeTypes.TOPIC),
// key = "dead"))
// public void handle(Map<String, String> msg, Message message,Channel channel ) throws Exception{
// System.out.println(msg);
// System.out.println(message.getContent());
// System.out.println();
// }
@RabbitListener(queues = "bbb.Queue.dead")
public void getMyMessage(String msg) throws IOException {
//mes为body中的数据
System.out.println(msg);
}
@RabbitListener(queues = "queue_delay")
public void getdely(String msg){
System.out.println(msg);
}
}

@ -1,35 +0,0 @@
package org.springblade.system.user.test;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 延时队列路由配置
* @author ytl
* @since 2022-10-11 17:36
*/
@Configuration
public class PluginDelayRabbitConfig {
@Bean("pluginDelayExchange")
public Exchange pluginDelayExchange() {
Map<String, Object> argMap = new HashMap<>();
argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
//第二个参数必须为x-delayed-message
return new CustomExchange("delay_exchange","x-delayed-message",false, false, argMap);
}
@Bean("pluginDelayQueue")
public Queue pluginDelayQueue(){
return new Queue("queue_delay");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(pluginDelayQueue()).to(pluginDelayExchange()).with("delayed").noargs();
}
}

@ -1,39 +0,0 @@
package org.springblade.system.user.test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* 2.正常的交换机 队列配置消息先进入到这里
* 这里希望为每个消息单独设计过期时间因此args.put("x-message-ttl",5000)被注释掉了
* 如果2者都设置了以值小的为准
* @author ytl
* @since 2022-10-11 14:17
*/
@Configuration
public class TTLConerter {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("ttl_direct_exchange", true, false);
}
@Bean
public Queue abcQueue() {
HashMap<String,Object> args = new HashMap<>();
//args.put("x-message-ttl",5000); //设置整个队列的过期时间
args.put("x-dead-letter-exchange","dead.dlx.Exchange");//设置该队列要关联的死信交换机
args.put("x-dead-letter-routing-key","dead");//绑定你的死信交换机的key
return new Queue("ttl.message.direct.queue",true,false,false, args);
}
@Bean
public Binding abcBinding(){
return BindingBuilder.bind(abcQueue()).to(directExchange()).with("ttlmessage");
}
}

@ -1,75 +0,0 @@
package org.springblade.system.user.test;
import org.springblade.core.tool.api.R;
import org.springblade.system.user.util.QueueEnum;
import org.springblade.system.user.util.RedisDelayQueueUtil;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
/**
* 测试rabbitmq 死信队列
* @author ytl
* @since 2022-10-11 13:55
*/
@RestController
@RequestMapping("/userTest")
public class TestUserController {
@Autowired
private RabbitTemplate rabbitTemplate;
//1.生产者,通过路由器ttl_direct_exchange和路由键ttlmessage向一个队列中发送带有过期时间的消息
@GetMapping("/sendTtlMessage")
public R sendTtlMessage(String ms,long time) throws Exception{
//路由器
String exchangeName ="ttl_direct_exchange";
//路由key
String luYouKey = "ttlmessage";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(time));
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
// Map<String,String> map = new HashMap<>();
// map.put("time",String.valueOf(time));
// map.put("msg",ms);
rabbitTemplate.convertAndSend(exchangeName,luYouKey,ms,messagePostProcessor);
return R.data("1");
}
//测试延迟队列模式
@GetMapping("/sendTtlMessage02")
public R sendTtlMessage02(String ms,long time) throws Exception{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay",time * 1000);
Message message = new Message(ms.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay_exchange","delayed",message);
return R.data("2");
}
//测试redisson延迟队列添加信息
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@GetMapping("/addQueue")
public void addQueue( String id){
redisDelayQueueUtil.addDelayQueue(id,5, TimeUnit.SECONDS, QueueEnum.METTING_REMAIND.getCode());
redisDelayQueueUtil.addDelayQueue(id,10, TimeUnit.SECONDS, QueueEnum.METTING_END.getCode());
}
}

@ -1,29 +0,0 @@
package org.springblade.system.user.test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 3.死信交换机 路由的配置
* @author ytl
* @since 2022-10-11 14:33
*/
@Configuration
public class dlxBase {
@Bean
public DirectExchange DlxExchange(){
return new DirectExchange("dead.dlx.Exchange",true,false);
}
@Bean
public Queue bbbQueue(){
return new Queue("bbb.Queue.dead");
}
@Bean
public Binding bbbBinding(){
return BindingBuilder.bind(bbbQueue()).to(DlxExchange()).with("dead");
}
}

@ -2,11 +2,3 @@
server: server:
port: 8004 port: 8004
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /

Loading…
Cancel
Save