diff --git a/lab-service/lab-user/pom.xml b/lab-service/lab-user/pom.xml
index d180648..8c0a058 100644
--- a/lab-service/lab-user/pom.xml
+++ b/lab-service/lab-user/pom.xml
@@ -52,6 +52,13 @@
compile
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
+
diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java
index 834be4d..198aefd 100644
--- a/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java
+++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java
@@ -4,7 +4,10 @@ package org.springblade.system.user;
import org.springblade.core.cloud.feign.EnableBladeFeign;
import org.springblade.core.launch.BladeApplication;
import org.springblade.core.launch.constant.AppConstant;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.client.SpringCloudApplication;
+import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
/**
@@ -20,5 +23,10 @@ public class UserApplication {
public static void main(String[] args) {
BladeApplication.run(AppConstant.APPLICATION_USER_NAME, UserApplication.class, args);
}
+// @Bean
+// public MessageConverter jsonMessageConverter(){
+// return new Jackson2JsonMessageConverter();
+// }
+
}
diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java
new file mode 100644
index 0000000..77e5592
--- /dev/null
+++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java
@@ -0,0 +1,44 @@
+package org.springblade.system.user.test;
+
+import com.rabbitmq.client.Channel;
+import org.springblade.resource.entity.Message;
+import org.springframework.amqp.core.ExchangeTypes;
+import org.springframework.amqp.rabbit.annotation.*;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 4.死信队列的消息监听器
+ * @author ytl
+ * @since 2022-10-11 14:45
+ */
+@Component
+public class DeadMessageListener {
+// @RabbitListener(bindings = @QueueBinding(
+// //参数 durable exclusive autoDelete 这些默认都是false,参数可以省略
+// value = @Queue(name = "bbb.Queue.dead", durable = "false", exclusive = "false", autoDelete = "false"),
+// exchange = @Exchange(name = "dead.dlx.Exchange", type = ExchangeTypes.TOPIC),
+// key = "dead"))
+// public void handle(Map msg, Message message,Channel channel ) throws Exception{
+// System.out.println(msg);
+// System.out.println(message.getContent());
+// System.out.println();
+// }
+
+ @RabbitListener(queues = "bbb.Queue.dead")
+ public void getMyMessage(String msg) throws IOException {
+ //mes为body中的数据
+ System.out.println(msg);
+ }
+
+ @RabbitListener(queues = "queue_delay")
+ public void getdely(String msg){
+ System.out.println(msg);
+ }
+}
diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java
new file mode 100644
index 0000000..b7b4f92
--- /dev/null
+++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java
@@ -0,0 +1,35 @@
+package org.springblade.system.user.test;
+
+import org.springframework.amqp.core.*;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 延时队列路由配置
+ * @author ytl
+ * @since 2022-10-11 17:36
+ */
+@Configuration
+public class PluginDelayRabbitConfig {
+ @Bean("pluginDelayExchange")
+ public Exchange pluginDelayExchange() {
+ Map argMap = new HashMap<>();
+ argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout
+ //第二个参数必须为x-delayed-message
+ return new CustomExchange("delay_exchange","x-delayed-message",false, false, argMap);
+ }
+
+ @Bean("pluginDelayQueue")
+ public Queue pluginDelayQueue(){
+ return new Queue("queue_delay");
+ }
+
+ @Bean
+ public Binding binding() {
+ return BindingBuilder.bind(pluginDelayQueue()).to(pluginDelayExchange()).with("delayed").noargs();
+ }
+}
diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java
new file mode 100644
index 0000000..a552f1a
--- /dev/null
+++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java
@@ -0,0 +1,39 @@
+package org.springblade.system.user.test;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+
+/**
+ * 2.正常的交换机 队列配置,消息先进入到这里。
+ * 这里希望为每个消息单独设计过期时间,因此args.put("x-message-ttl",5000)被注释掉了
+ * 如果2者都设置了,以值小的为准
+ * @author ytl
+ * @since 2022-10-11 14:17
+ */
+@Configuration
+public class TTLConerter {
+ @Bean
+ public DirectExchange directExchange() {
+ return new DirectExchange("ttl_direct_exchange", true, false);
+ }
+
+ @Bean
+ public Queue abcQueue() {
+ HashMap args = new HashMap<>();
+ //args.put("x-message-ttl",5000); //设置整个队列的过期时间
+ args.put("x-dead-letter-exchange","dead.dlx.Exchange");//设置该队列要关联的死信交换机
+ args.put("x-dead-letter-routing-key","dead");//绑定你的死信交换机的key
+
+ return new Queue("ttl.message.direct.queue",true,false,false, args);
+ }
+ @Bean
+ public Binding abcBinding(){
+ return BindingBuilder.bind(abcQueue()).to(directExchange()).with("ttlmessage");
+ }
+}
diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java
new file mode 100644
index 0000000..f9e0c33
--- /dev/null
+++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java
@@ -0,0 +1,63 @@
+package org.springblade.system.user.test;
+
+import org.springblade.core.tool.api.R;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Handler;
+
+/**
+ * 测试rabbitmq 死信队列
+ * @author ytl
+ * @since 2022-10-11 13:55
+ */
+@RestController
+@RequestMapping("/userTest")
+public class TestUserController {
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ //1.生产者,通过路由器ttl_direct_exchange和路由键ttlmessage向一个队列中发送带有过期时间的消息
+ @GetMapping("/sendTtlMessage")
+ public R sendTtlMessage(String ms,long time) throws Exception{
+ //路由器
+ String exchangeName ="ttl_direct_exchange";
+ //路由key
+ String luYouKey = "ttlmessage";
+
+
+ MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
+ @Override
+ public Message postProcessMessage(Message message) throws AmqpException {
+ message.getMessageProperties().setExpiration(String.valueOf(time));
+ message.getMessageProperties().setContentEncoding("UTF-8");
+ return message;
+ }
+ };
+
+// Map map = new HashMap<>();
+// map.put("time",String.valueOf(time));
+// map.put("msg",ms);
+
+ rabbitTemplate.convertAndSend(exchangeName,luYouKey,ms,messagePostProcessor);
+
+ return R.data("1");
+ }
+
+ //测试延迟队列模式
+ @GetMapping("/sendTtlMessage02")
+ public R sendTtlMessage02(String ms,long time) throws Exception{
+ MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setHeader("x-delay",time * 1000);
+ Message message = new Message(ms.getBytes(), messageProperties);
+ rabbitTemplate.convertAndSend("delay_exchange","delayed",message);
+ return R.data("2");
+ }
+}
diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java
new file mode 100644
index 0000000..14b054b
--- /dev/null
+++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java
@@ -0,0 +1,29 @@
+package org.springblade.system.user.test;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 3.死信交换机 路由的配置
+ * @author ytl
+ * @since 2022-10-11 14:33
+ */
+@Configuration
+public class dlxBase {
+ @Bean
+ public DirectExchange DlxExchange(){
+ return new DirectExchange("dead.dlx.Exchange",true,false);
+ }
+ @Bean
+ public Queue bbbQueue(){
+ return new Queue("bbb.Queue.dead");
+ }
+ @Bean
+ public Binding bbbBinding(){
+ return BindingBuilder.bind(bbbQueue()).to(DlxExchange()).with("dead");
+ }
+}
diff --git a/lab-service/lab-user/src/main/resources/application-dev.yml b/lab-service/lab-user/src/main/resources/application-dev.yml
index 1fb2d03..3b4d61b 100644
--- a/lab-service/lab-user/src/main/resources/application-dev.yml
+++ b/lab-service/lab-user/src/main/resources/application-dev.yml
@@ -1,3 +1,12 @@
#服务器端口
server:
port: 8004
+
+spring:
+ rabbitmq:
+ host: 127.0.0.1
+ port: 5672
+ username: guest
+ password: guest
+ virtual-host: /
+