摘要:摘要: 大型系统的演变必然的发展方向是分布式,而在分布式系统中应用与应用之间互相连接越来越紧密,在应用之间的消息传递就很普遍了。使用Java消息中间件处理异步消息成为了分布式系统中的必修课,本博客就如何在Java中使用消息中间件进行详细说明。
一、消息中间件
1.消息中间件概述
中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。 消息中间件:关注与数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。 JMS:Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 AMQP:AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。 JMS和AMQP对比2.消息中间件图示
3.常见消息中间件对比
3.1.ActiveMQ
(1)概述:ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS 1.1和J2EE 1.4规范的,JMS Provider实现,尽管JMS规范出台已经很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
(2)特性:
多种语言和协议编写客户端。语言:Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS、Notification、XMPP、AMQP 完全支持JMS 1.1和J2EE规范(持久化,XA消息,事务) 虚拟主题,组合目的,镜像队列3.2.RabbitMQ
(1)概述:RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
(2)特性:
支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等 AMQP的完整实现(vhost虚拟主机、Exchange交换器、Binding绑定、Routing Key路由器等) 事务支持/发布确认 消息持久化3.3.Kafka
(1)概述:Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
(2)特性:
通过O(1)的算法复杂度的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒钟数百万的消息 Partition、Consumer Group3.4.综合评价
二、JMS
1.JMS规范
(1)JMS相关概念
提供者:实现JMS规范的消息中间件服务器 客户端:发送或接收消息的应用程序 生产者/发布者:创建并发送消息的客户端 消费者/订阅者:接收并处理消息的客户端 消息:应用程序之间传递的数据内容 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式(2)JMS消息模式
A.队列模型
客户端包括生产者和消费者 队列中的消息只能被一个消费者消费 消费者可以随时消费队列中的消息队列模型消息示意图:
B.主题模型
客户端包括发布者和订阅者 主题中的消息被所有订阅者消费 消费者不能消费订阅之前就发送到主题中的消息,即只能先订阅才能消费主题模型消息示意图:
(3)JMS编码接口
ConnectionFactory 用于创建连接到消息中间件的链接工厂 Connection 代表了应用程序和消息服务器之间的通信链路 Destination 指消息发布和接收的地点,包括队列或主题 Session 表示一个单线程的上下文,用于发送和接收消息 MessageConsumer 由会话创建,用于接收发送到目标的消息 MessageProducer 由会话创建,用于发送消息到目标 Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体5.JMS编程接口之间的关系
2.ActiveMQ的安装与启动
(1)在Windows平台安装ActiveMQ
下载安装包 http://activemq.apache.org/ 直接启动,以管理员身份运行启动你电脑对应版本的activemq.bat,即可成功启动,比如,我的是64位的,即选择如下图中的win64文件夹下的activemq.bat,或者安装成Windows服务,以服务方式启动,记住使用以管理员身份运行。
无论你是如何启动,启动成功后,使用浏览器访问127.0.0.1:8161就可以访问ActiveMQ的管理页面,点击Manage ActiveMQ broker,出现弹框输入用户名密码,都默认为admin,然后就可以进入管理主页了!
(2)在Linux平台安装ActiveMQ
下载安装包 wget http://mirror.bit.edu.cn/apache//activemq/5.15.2/apache-activemq-5.15.2-bin.tar.gz 解压安装包 tar -zxvf apache-activemq-5.15.2-bin.tar.gz 启动 解压完成后,进入解压后产生的目录,使用 ./activemq start 启动,可以访问linux服务器的ip地址加端口8161,能进入管理页面即成功安装并启动,使用./activemq stop 关闭。3.JMS的代码实现
3.1.使用JMS接口规范连接ActiveMQ
创建生产者 创建发布者 创建消费者 创建订阅者具体代码实现可参考本人码云项目:https://gitee.com/kevinshaw/jms-demo.git
3.2.使用Spring集成JMS连接ActiveMQ
(1)ConnectionFactory 用于管理连接的连接工厂
一个Spring为我们提供的连接池 JmsTemplate每次发消息都会重新创建连接,会话和productor Spring中提供SingleConnectionFactory和CachingConnectionFactory,SingleConnectionFactory对于建立连接请求只会返回同一个Connection,并且使用同一个close方法;CachingConnectionFactory继承自SingleConnectionFactory,所有拥有SingleConnectionFactory的所有功能,而且新增缓存功能,可以缓存会话,Producer,Consumer(2)JmsTemplate 用于发送和接收消息的模板类
是Spring提供的,只需要向Spring容器内注册这个类就可以使用JmsTemplate方便的操作jms JmsTemplate类是线程安全的,可以在整个应用范围使用(3)MessageListerner 消息监听器
实现一个onMessage方法,该方法只接收一个Message参数具体代码实现可参考本人码云项目:https://gitee.com/kevinshaw/jms-demo.git
三、ActiveMQ集群配置
1.对消息中间件群集的原因:
实现高可用,以排除单点故障引起的服务中断 实现负载均衡,以提升效率为更多客户提供服务2.ActiveMQ集群基础知识
(1)集群方式
客户端集群:让多个消费者消费同一队列,在队列模式下已经支持这种情况了,但是在主题模式下,多个消费者是消费了完整的消息,造成消息重复的可能; Broker clusters:多个Broker之间同步消息; Master Slave:实现高可用的一种方式,当主消息服务器宕机时,备服务器可以立即补充,以保证服务的继续。(2)客户端集群配置
ActiveMQ失效转移(failover)
介绍:允许当其中一台服务器宕机时,客户端在传输层上重新连接到其他消息服务器。 语法:failover:(uri1,...,uriN)?transportOptionstransportOptions参数说明:
randomize 默认为true,表示在URI列表中选择URI连接时是否采用随机策略 initialReconnectDelay 默认10,单位毫秒,表示第一次尝试重连之间的等待的时间(3)Broker Cluster集群配置
原理:当有两个节点A,B,节点A可以把消息同步到节点B,节点B也可以把消息同步到节点A,通过消息同步之后,节点A接收到的消息可以给节点B消费掉,同理,节点B接收到的消息可以给节点A消费掉。它的实现方式是采用的网络连接器的方式。
NetwrokConncetor(网络连接器):网络连接器主要用于配置ActiveMQ服务器与服务器之间的网络通讯方式,用于服务器透传消息,网络连接器分为静态连接器和动态连接器。
静态连接器:即在服务器ip地址上具体指定ip地址,通过以下配置,当服务器比较多的时候,使用静态连接就比较麻烦,这个时候就需要用到动态连接了
<networkConnectors>
<networkConnectors uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>
动态连接器:使用多播的方式通知其他服务器,配置如下,我们首先需要定义网络连接器和传输连接器,传输连接器会告知我们一个发现的uri地址,这个就是我们的主播地址,这样我们就可以达到动态扩展服务器的效果了。
<networkConnectors>
<networkConnectors uri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnectors uri="tcp://localhost:0" discoveryUri="multicast://default"/>
</transportConnectors>
(4)Master/Slave集群配置
ActiveMQ Master Slave 集群方案:
Share nothing storage master/slave (已过时,5.8+后移除) Shared storage master/slave 共享存储 Replicated LevelDB Store 基于复制的LevelDB Store共享存储集群的原理:我们把节点A,B的持久化配置到同一个地方,先启动节点A,此时节点A获取到排它锁,节点A独占资源成为Master,而节点B无法获得锁资源,节点B就成了Slave;节点A就获得了对外开放服务的能力,可以通过外部的客户端提交信息到节点A,但是不能发送消息到节点B;如果此时节点A挂了,那么节点B立即获得了持久化资源的排它锁,成为新的Master,接收到外部客户端,而客户端使用失效转移之后,将消息发送到节点B,这个时候完成整个请求的不间断性,完成了高可用
基于复制的LevelDB Store的原理:因为LevelDB是基于ZK的,所以它的服务器至少需要三台,假设我们有三个服务器节点A,B,C,每个节点都有自己的储存方式,它们都配置同一个ZooKeeper节点,通过ZK来选举一台服务器作为Master,比如选举节点A作为Master,这时节点A就具有对外部提供服务的能力,而节点B,C是不具备的,节点A获取了服务器的外部消息资源后,它首先在本地储存,然后通过ZK将消息同步给B,C,然后B,C分别在自己的服务器里储存,这就是基于复制LevelDB的方式,如果节点A出现故障,那么ZK会立即选举一个新的Master出来。
(5)两种集群方式对比
Master/Slave:它可以做到高可用,当一台服务器挂了,其他服务器可以立即补充上去,并且保证了消息不会丢失,但做不了负载均衡,因为Slave不具备外部服务器提供服务的能力;
Broker Cluster:它不具备高可用的能力,因为它自己的消息并没有在一个地方储存,也就是说当一台服务器挂了的时候,它正在处理的消息可能会同步丢失,但是它可以做到负载均衡,即节点A,B上的消息可以互相消费。
(6)三台服务器的完美集群方案(既实现高可用,又实现负载均衡)
因为基于复制LevelDB的方式至少需要三台服务器,所以这里我们使用共享持久化资源方式;我们首先将节点A,B组成消息同步,然后将节点A,C也组成消息同步;节点B,C组成Master/Slave,然后按顺序启动A,B,C。这个方案能够实现节点A,B,C任意一台服务器宕机时,对整个集群不受影响,仍然可以正常工作,但是需要尽快恢复宕机服务器的问题,如果A,B同时宕机,整个集群就会崩溃了。所以如果要提高稳定性,可以增加服务器数量。
四、使用其他消息中间件
前面主要介绍了ActiveMQ,其最大优点是支持JMS,JMS让Java的开发变得简单,但其在各方面表现都比较中庸,也存在自己的问题,比如吞吐量没有Kafka高,稳定性没有RabbitMQ强。
1.企业开发需要解决的问题
不同业务系统分别处理同一消息,同一业务系统负载处理同类消息 解决消息发送时的一致性问题 解决消息处理时的幂等性问题 基于消息机制建立事件总线2.使用其他消息中间件时,分析需要做的事
解决各业务系统集群处理同一条消息 实现自己的消息提供者3.RabbitMQ:使用交换器绑定到队列
RabbitMQ消息提供者源码解析:
3.集成Kafka
Kafka使用group.id分组消费者
配置消费者参数group.id相同时对消息进行负载处理 配置服务器partitions参数,控制同一个group.id下的consumer数量小于partitions Kafka只保证同一个partition下的消息是有序的Kafka消息提供者源码解析:
相关文章推荐
虚拟主机的专业参数,分别都是什么意思?2022-09-09
中非域名注册规则是怎样的?注册域名有什么用处? 2022-01-10
HostEase新年活动促销 美国/香港主机全场低至五折2021-12-28
HostGator下载完整备份教程分享2021-12-28
Flink中有界数据与无界数据的示例分析2021-12-28