From 9cce3e31bd2bdd9204a32a264d3d3d34582a7de3 Mon Sep 17 00:00:00 2001 From: "a15234804788@163.com" Date: Tue, 11 Oct 2022 18:01:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E9=80=9A=E8=BF=87rabbitmq?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=BB=B6=E8=BF=9F=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lab-service/lab-user/pom.xml | 7 +++ .../system/user/UserApplication.java | 8 +++ .../system/user/test/DeadMessageListener.java | 44 +++++++++++++ .../user/test/PluginDelayRabbitConfig.java | 35 +++++++++++ .../system/user/test/TTLConerter.java | 39 ++++++++++++ .../system/user/test/TestUserController.java | 63 +++++++++++++++++++ .../springblade/system/user/test/dlxBase.java | 29 +++++++++ .../src/main/resources/application-dev.yml | 9 +++ 8 files changed, 234 insertions(+) create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java create mode 100644 lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java diff --git a/lab-service/lab-user/pom.xml b/lab-service/lab-user/pom.xml index d180648..8c0a058 100644 --- a/lab-service/lab-user/pom.xml +++ b/lab-service/lab-user/pom.xml @@ -52,6 +52,13 @@ compile + + + org.springframework.boot + spring-boot-starter-amqp + + + diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java index 834be4d..198aefd 100644 --- a/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/UserApplication.java @@ -4,7 +4,10 @@ package org.springblade.system.user; import org.springblade.core.cloud.feign.EnableBladeFeign; import org.springblade.core.launch.BladeApplication; import org.springblade.core.launch.constant.AppConstant; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.cloud.client.SpringCloudApplication; +import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableAsync; /** @@ -20,5 +23,10 @@ public class UserApplication { public static void main(String[] args) { BladeApplication.run(AppConstant.APPLICATION_USER_NAME, UserApplication.class, args); } +// @Bean +// public MessageConverter jsonMessageConverter(){ +// return new Jackson2JsonMessageConverter(); +// } + } diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java new file mode 100644 index 0000000..77e5592 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/DeadMessageListener.java @@ -0,0 +1,44 @@ +package org.springblade.system.user.test; + +import com.rabbitmq.client.Channel; +import org.springblade.resource.entity.Message; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/** + * 4.死信队列的消息监听器 + * @author ytl + * @since 2022-10-11 14:45 + */ +@Component +public class DeadMessageListener { +// @RabbitListener(bindings = @QueueBinding( +// //参数 durable exclusive autoDelete 这些默认都是false,参数可以省略 +// value = @Queue(name = "bbb.Queue.dead", durable = "false", exclusive = "false", autoDelete = "false"), +// exchange = @Exchange(name = "dead.dlx.Exchange", type = ExchangeTypes.TOPIC), +// key = "dead")) +// public void handle(Map msg, Message message,Channel channel ) throws Exception{ +// System.out.println(msg); +// System.out.println(message.getContent()); +// System.out.println(); +// } + + @RabbitListener(queues = "bbb.Queue.dead") + public void getMyMessage(String msg) throws IOException { + //mes为body中的数据 + System.out.println(msg); + } + + @RabbitListener(queues = "queue_delay") + public void getdely(String msg){ + System.out.println(msg); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java new file mode 100644 index 0000000..b7b4f92 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/PluginDelayRabbitConfig.java @@ -0,0 +1,35 @@ +package org.springblade.system.user.test; + +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * 延时队列路由配置 + * @author ytl + * @since 2022-10-11 17:36 + */ +@Configuration +public class PluginDelayRabbitConfig { + @Bean("pluginDelayExchange") + public Exchange pluginDelayExchange() { + Map argMap = new HashMap<>(); + argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout + //第二个参数必须为x-delayed-message + return new CustomExchange("delay_exchange","x-delayed-message",false, false, argMap); + } + + @Bean("pluginDelayQueue") + public Queue pluginDelayQueue(){ + return new Queue("queue_delay"); + } + + @Bean + public Binding binding() { + return BindingBuilder.bind(pluginDelayQueue()).to(pluginDelayExchange()).with("delayed").noargs(); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java new file mode 100644 index 0000000..a552f1a --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TTLConerter.java @@ -0,0 +1,39 @@ +package org.springblade.system.user.test; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; + +/** + * 2.正常的交换机 队列配置,消息先进入到这里。 + * 这里希望为每个消息单独设计过期时间,因此args.put("x-message-ttl",5000)被注释掉了 + * 如果2者都设置了,以值小的为准 + * @author ytl + * @since 2022-10-11 14:17 + */ +@Configuration +public class TTLConerter { + @Bean + public DirectExchange directExchange() { + return new DirectExchange("ttl_direct_exchange", true, false); + } + + @Bean + public Queue abcQueue() { + HashMap args = new HashMap<>(); + //args.put("x-message-ttl",5000); //设置整个队列的过期时间 + args.put("x-dead-letter-exchange","dead.dlx.Exchange");//设置该队列要关联的死信交换机 + args.put("x-dead-letter-routing-key","dead");//绑定你的死信交换机的key + + return new Queue("ttl.message.direct.queue",true,false,false, args); + } + @Bean + public Binding abcBinding(){ + return BindingBuilder.bind(abcQueue()).to(directExchange()).with("ttlmessage"); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java new file mode 100644 index 0000000..f9e0c33 --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/TestUserController.java @@ -0,0 +1,63 @@ +package org.springblade.system.user.test; + +import org.springblade.core.tool.api.R; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Handler; + +/** + * 测试rabbitmq 死信队列 + * @author ytl + * @since 2022-10-11 13:55 + */ +@RestController +@RequestMapping("/userTest") +public class TestUserController { + @Autowired + private RabbitTemplate rabbitTemplate; + + //1.生产者,通过路由器ttl_direct_exchange和路由键ttlmessage向一个队列中发送带有过期时间的消息 + @GetMapping("/sendTtlMessage") + public R sendTtlMessage(String ms,long time) throws Exception{ + //路由器 + String exchangeName ="ttl_direct_exchange"; + //路由key + String luYouKey = "ttlmessage"; + + + MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { + @Override + public Message postProcessMessage(Message message) throws AmqpException { + message.getMessageProperties().setExpiration(String.valueOf(time)); + message.getMessageProperties().setContentEncoding("UTF-8"); + return message; + } + }; + +// Map map = new HashMap<>(); +// map.put("time",String.valueOf(time)); +// map.put("msg",ms); + + rabbitTemplate.convertAndSend(exchangeName,luYouKey,ms,messagePostProcessor); + + return R.data("1"); + } + + //测试延迟队列模式 + @GetMapping("/sendTtlMessage02") + public R sendTtlMessage02(String ms,long time) throws Exception{ + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setHeader("x-delay",time * 1000); + Message message = new Message(ms.getBytes(), messageProperties); + rabbitTemplate.convertAndSend("delay_exchange","delayed",message); + return R.data("2"); + } +} diff --git a/lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java new file mode 100644 index 0000000..14b054b --- /dev/null +++ b/lab-service/lab-user/src/main/java/org/springblade/system/user/test/dlxBase.java @@ -0,0 +1,29 @@ +package org.springblade.system.user.test; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 3.死信交换机 路由的配置 + * @author ytl + * @since 2022-10-11 14:33 + */ +@Configuration +public class dlxBase { + @Bean + public DirectExchange DlxExchange(){ + return new DirectExchange("dead.dlx.Exchange",true,false); + } + @Bean + public Queue bbbQueue(){ + return new Queue("bbb.Queue.dead"); + } + @Bean + public Binding bbbBinding(){ + return BindingBuilder.bind(bbbQueue()).to(DlxExchange()).with("dead"); + } +} diff --git a/lab-service/lab-user/src/main/resources/application-dev.yml b/lab-service/lab-user/src/main/resources/application-dev.yml index 1fb2d03..3b4d61b 100644 --- a/lab-service/lab-user/src/main/resources/application-dev.yml +++ b/lab-service/lab-user/src/main/resources/application-dev.yml @@ -1,3 +1,12 @@ #服务器端口 server: port: 8004 + +spring: + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: guest + password: guest + virtual-host: / +