Springboot 集成rabbitmq

摘要:摘要: 对于RabbitMQ来说,生产者和消息队列之间存在隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。消费者通过读取消息队列从而实现消息的发送和接收。

RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。
对于RabbitMQ来说,生产者和消息队列之间存在隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。消费者通过读取消息队列从而实现消息的发送和接收。

1 (36).jpg


交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

  Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

  topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

  headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

  Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

消息队列两个用处:服务间解耦,缓解压力(削峰平谷);
RabbitMQ实现了AQMP协议,AQMP协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。
(1)点对点
生产端发送一条消息通过路由投递到Queue,只有一个消费者能消费到。

(2)多订阅
当RabbitMQ需要支持多订阅时,发布者发送的消息通过路由同时写到多个Queue,不同订阅组消费不同的Queue。所以支持多订阅时,消息会多个拷贝。

2、安装RabbitMQ服务端
(1)下载Erlang安装包:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
(2)安装和配置RabbitMQ服务端,3.6.0版本:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
(官方:安装Erland,通过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。
安装RabbitMQ,通过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。)
(3)启用web管理插件:rabbitmq-plugins enable rabbitmq_management
(4)启动RabbitMQ:chkconfig rabbitmq-server on /sbin/service rabbitmq-server start
(5)防火墙开通端口
# firewall-cmd --permanent --zone=public --add-port=5672/tcp
# firewall-cmd --permanent --zone=public --add-port=15672/tcp
# firewall-cmd --reload
(6)rabbitmq默认会创建guest账号,只能用于localhost登录页面管理员,本机访问地址:http://localhost:15672/

三.SpringBoot整合RabbitMQ(Topic转发模式)

  首先我们看发送端,我们需要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:


首先 配置pom.xml
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

接着在application.properties中,去编辑和RabbitMQ相关的配置信息,配置信息的代表什么内容根据键就能很直观的看出了.这里端口是5672,不是15672...15672是管理端的端口!

server.port=17080
#mq
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ncs
spring.rabbitmq.password=12345678
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

我们看发送端,我们需要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:

package com.example.demo.conf; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by ningcs on 2017/10/30. */ @Configuration public class SenderConf { @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词 } }

 

在SpringBoot中,我们使用AmqpTemplate去发送消息!代码如下:

package com.example.demo.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by ningcs on 2017/10/30. */ @Component public class HelloSender { @Autowired private AmqpTemplate template; public void send(String msg) { System.out.println(msg); template.convertAndSend("exchange","topic.message","hello,rabbit~"); } }

测试

package com.example.demo.controller; import com.example.demo.sender.HelloSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; /** * Created by ningcs on 2017/10/30. */ @RestController @RequestMapping("rabbit") public class RabbitController { @Autowired private HelloSender helloSender; @RequestMapping(value = "/hello",method = {RequestMethod.GET, RequestMethod.POST}) public String helloSender(){ helloSender.send("hello,rabbit~"); return "发送成功"; } }

接收端(可以放在两个项目,配置文件一样):
package com.example.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* Created by ningcs on 2017/10/30.
*/
@Component
public class HelloReceive {

// @RabbitListener(queues="queue") //监听器监听指定的Queue
// public void processC(String str) {
// System.out.println("Receive:"+str);
// }

@RabbitListener(queues="topic.message") //监听器监听指定的Queue
public void process1(String str) {
System.out.println("message:"+str);
}
@RabbitListener(queues="topic.messages") //监听器监听指定的Queue
public void process2(String str) {
System.out.println("messages:"+str);
}

}

访问地址:http://localhost:17080/rabbit/hello
控制台输出以下日志说明成功:
2017-11-02 11:30:11.982 INFO 4476 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17080 (http)
2017-11-02 11:30:11.994 INFO 4476 --- [ main] d.SpringbootRabbitTopicSenderApplication : Started SpringbootRabbitTopicSenderApplication in 12.292 seconds (JVM running for 12.829)
2017-11-02 11:31:06.712 INFO 4476 --- [io-17080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet \'dispatcherServlet\'
2017-11-02 11:31:06.713 INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet \'dispatcherServlet\': initialization started
2017-11-02 11:31:06.729 INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet \'dispatcherServlet\': initialization completed in 16 ms
hello,rabbit~
hello,rabbit~

2017-11-02 11:30:13.165 INFO 11064 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17081 (http)
2017-11-02 11:30:13.170 INFO 11064 --- [ main] SpringbootRabbitTopicReceiverApplication : Started SpringbootRabbitTopicReceiverApplication in 13.48 seconds (JVM running for 17.121)
messages:hello,rabbit~
message:hello,rabbit~
messages:hello,rabbit~
message:hello,rabbit~

Fanout 和Direct模式详见最下面github地址。

术语解释

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
RabbitMQ消息队列详细介绍(主要涉及术语)
http://blog.csdn.net/leyangjun/article/details/52529047

通信协议AMQP(Advanced Message Queuing Protocol)
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:
客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。
客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

RabbitMQ消息队列-RabbitMQ的优劣势及产生背景