测试通过rabbitmq实现延迟消息

dev
a15234804788@163.com 3 years ago
parent 3bb83f46eb
commit 9cce3e31bd
  1. 7
      lab-service/lab-user/pom.xml
  2. 8
      lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java
  3. 44
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java
  4. 35
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java
  5. 39
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java
  6. 63
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java
  7. 29
      lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java
  8. 9
      lab-service/lab-user/src/main/resources/application-dev.yml

@ -52,6 +52,13 @@
<scope>compile</scope>
</dependency>
<!-- springcloud stream -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>

@ -4,7 +4,10 @@ package org.springblade.system.user;
import org.springblade.core.cloud.feign.EnableBladeFeign;
import org.springblade.core.launch.BladeApplication;
import org.springblade.core.launch.constant.AppConstant;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.client.SpringCloudApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
/**
@ -20,5 +23,10 @@ public class UserApplication {
public static void main(String[] args) {
BladeApplication.run(AppConstant.APPLICATION_USER_NAME, UserApplication.class, args);
}
// @Bean
// public MessageConverter jsonMessageConverter(){
// return new Jackson2JsonMessageConverter();
// }
}

@ -0,0 +1,44 @@
package org.springblade.system.user.test;
import com.rabbitmq.client.Channel;
import org.springblade.resource.entity.Message;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* 4.死信队列的消息监听器
* @author ytl
* @since 2022-10-11 14:45
*/
@Component
public class DeadMessageListener {
// @RabbitListener(bindings = @QueueBinding(
// //参数 durable exclusive autoDelete 这些默认都是false,参数可以省略
// value = @Queue(name = "bbb.Queue.dead", durable = "false", exclusive = "false", autoDelete = "false"),
// exchange = @Exchange(name = "dead.dlx.Exchange", type = ExchangeTypes.TOPIC),
// key = "dead"))
// public void handle(Map<String, String> msg, Message message,Channel channel ) throws Exception{
// System.out.println(msg);
// System.out.println(message.getContent());
// System.out.println();
// }
@RabbitListener(queues = "bbb.Queue.dead")
public void getMyMessage(String msg) throws IOException {
//mes为body中的数据
System.out.println(msg);
}
@RabbitListener(queues = "queue_delay")
public void getdely(String msg){
System.out.println(msg);
}
}

@ -0,0 +1,35 @@
package org.springblade.system.user.test;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 延时队列路由配置
* @author ytl
* @since 2022-10-11 17:36
*/
@Configuration
public class PluginDelayRabbitConfig {
@Bean("pluginDelayExchange")
public Exchange pluginDelayExchange() {
Map<String, Object> argMap = new HashMap<>();
argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
//第二个参数必须为x-delayed-message
return new CustomExchange("delay_exchange","x-delayed-message",false, false, argMap);
}
@Bean("pluginDelayQueue")
public Queue pluginDelayQueue(){
return new Queue("queue_delay");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(pluginDelayQueue()).to(pluginDelayExchange()).with("delayed").noargs();
}
}

@ -0,0 +1,39 @@
package org.springblade.system.user.test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* 2.正常的交换机 队列配置消息先进入到这里
* 这里希望为每个消息单独设计过期时间因此args.put("x-message-ttl",5000)被注释掉了
* 如果2者都设置了以值小的为准
* @author ytl
* @since 2022-10-11 14:17
*/
@Configuration
public class TTLConerter {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("ttl_direct_exchange", true, false);
}
@Bean
public Queue abcQueue() {
HashMap<String,Object> args = new HashMap<>();
//args.put("x-message-ttl",5000); //设置整个队列的过期时间
args.put("x-dead-letter-exchange","dead.dlx.Exchange");//设置该队列要关联的死信交换机
args.put("x-dead-letter-routing-key","dead");//绑定你的死信交换机的key
return new Queue("ttl.message.direct.queue",true,false,false, args);
}
@Bean
public Binding abcBinding(){
return BindingBuilder.bind(abcQueue()).to(directExchange()).with("ttlmessage");
}
}

@ -0,0 +1,63 @@
package org.springblade.system.user.test;
import org.springblade.core.tool.api.R;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Handler;
/**
* 测试rabbitmq 死信队列
* @author ytl
* @since 2022-10-11 13:55
*/
@RestController
@RequestMapping("/userTest")
public class TestUserController {
@Autowired
private RabbitTemplate rabbitTemplate;
//1.生产者,通过路由器ttl_direct_exchange和路由键ttlmessage向一个队列中发送带有过期时间的消息
@GetMapping("/sendTtlMessage")
public R sendTtlMessage(String ms,long time) throws Exception{
//路由器
String exchangeName ="ttl_direct_exchange";
//路由key
String luYouKey = "ttlmessage";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(time));
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
// Map<String,String> map = new HashMap<>();
// map.put("time",String.valueOf(time));
// map.put("msg",ms);
rabbitTemplate.convertAndSend(exchangeName,luYouKey,ms,messagePostProcessor);
return R.data("1");
}
//测试延迟队列模式
@GetMapping("/sendTtlMessage02")
public R sendTtlMessage02(String ms,long time) throws Exception{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay",time * 1000);
Message message = new Message(ms.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay_exchange","delayed",message);
return R.data("2");
}
}

@ -0,0 +1,29 @@
package org.springblade.system.user.test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 3.死信交换机 路由的配置
* @author ytl
* @since 2022-10-11 14:33
*/
@Configuration
public class dlxBase {
@Bean
public DirectExchange DlxExchange(){
return new DirectExchange("dead.dlx.Exchange",true,false);
}
@Bean
public Queue bbbQueue(){
return new Queue("bbb.Queue.dead");
}
@Bean
public Binding bbbBinding(){
return BindingBuilder.bind(bbbQueue()).to(DlxExchange()).with("dead");
}
}

@ -1,3 +1,12 @@
#服务器端口
server:
port: 8004
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /

Loading…
Cancel
Save