rocketmq架构分析(跟RocketMQ学习消息系统设计)
消息系统我们在项目中经常使用,但是如何自己设计一个消息系统呢?一个消息系统需要有消息的生产者、消费者,还需要具备消息的存储功能,设计时要考虑如下问题:
- 消息发送:生产者如何获取消息服务器地址,消息如何发送,如何区分不同的消息,怎样保证消息不丢失,如果消息服务器增减机器,生产者如何感知。
- 消息存储:消息如何存储,如何写文件,怎样具备高可用,低延迟。
- 消息消费:消费者怎样找到消息,怎么保证消息被消费者消费,消息是否可以消费多次。
我们从学习RocketMQ入手,来学习消息系统的设计。
二、RocketMQ架构RocketMQ的逻辑部署图:
RocketMQ逻辑部署图
RocketMQ的角色包含Broker、NameServer、Producer和Consumer。
- Broker:用于接收Producer发送的消息,向Consumer转发消息,进行消息的存储,Broker启动时会向NameServer注册自己的信息。
- NameServer:是RocketMQ的注册中心,用于服务注册、服务发现和路由管理。多个NameServer之间信息不共享,分别记录Broker和队列等信息。
- Producer:消息的生产者。生产者发送某一主题的消息到Broker,开发者通过引入RocketMQ Client的jar包,使用send方法实现消息的发送功能。
- Consumer:消息的消费者,连接Broker获取topic下的消息。开发者通过引入RocketMQ Client的jar包,进行消息的监听并进行消费。
RocketMQ角色
三、RocketMQ功能3.1 路由注册路由注册是指将Broker信息注册到NameServer中,以便生产者、消费者可以从NameServer中获取到Broker的信息,进行消息的发送或接收。这样做的好处是Broker在扩容、缩容时,开发者对Broker的变化无感知。
Broker注册流程
Broker端- Broker启动:从配置文件中读取NameServer地址,遍历NameServer,向每个NameServer发送心跳包进行注册。Broker端使用是Netty框架与NameServer进行网络通信,上报Broker及集群信息,Topic队列等信息。
- 定时任务:Broker在启动之后,会每隔30s向NameServer发送心跳包,保持长连接。
- NameServer收到上报信息后,将Broker等信息存储在路由表中,更新路由表中的上报时间。
特殊说明
- NameServer可以部署多个,Broker依次向每个NameServer机器上报信息,以保证高可用。
- NameServer集群之间互不通信,也不进行信息共享,也就是说NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息生产者来说并不会影响消息发送。这极大的降低了NameServer实现的复杂度,性能也得到了提升,这与使用ZK作为注册中心是不同的。
- NameServer是内存式存储路由信息,本身是无状态的,也就是说NameServer中的Broker、Topic等状态信息不会持久存储(NameServer支持配置参数的持久化,一般用不到)。
如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?
RocktMQ有两个入口来触发路由删除。
1)定时任务:NameServer每隔10s扫描路由表,检测上次心跳包时间戳与当前系统时间的时间差,如果时间差大于120s,会从路由表中移除该Broker相关的信息并关闭Socket连接。
2)Broker在正常被关闭的情况下,会执行unregisterBroker指令,NameServer收到后会移除该Broker相关的信息并关闭Socket连接。
为什么路由变化不会马上通知消息生产者,而要等120s呢?这是为了降低NameServer实现的复杂性,路由变化由发送端提供容错机制来保证消息发送的高可用性。
3.3 路由发现路由发现是指让生产者、消费者找到消息服务Broker的过程。RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取Topic最新的路由。
路由发现的流程
1)定时任务
生产者启动时会启动定时任务,每30s执行一次路由表的动态更新,流程如下:
- 首先生产者或消费者根据topic查找路由信息,从Client端内存中查找路由信息,路由信息包括Broker信息和队列信息。
- 如果内存中未找到路由信息,则访问NameServer查询路由信息,并与本地内存中的路由信息进行对比,如果路由信息发生变化,则更新路由表,返回路由信息。
- 如果内存结构中包含路由信息,则返回路由信息。
2)消息发送时
- 在消息发送前,会先从内存中查找路由信息。
- 如果内存结构中包含路由信息,则返回路由信息(因为定时任务会更新路由表信息)。
- 如果内存中未找到路由信息,则访问NameServer查询路由信息,并与本地内存中的路由信息进行对比,如果路由信息发生变化,则更新路由表,返回路由信息。
生产者和消费者获取了broker地址和队列信息后,如何发送消息呢?
3.4 消息发送消息队列如何进行负载均衡Producer的代码在RocketMQ Client jar中,Producer启动后,会创建MQClientInstance实例,同时启动生产者和消费者。如果配置的NameServer地址相同,同一个JVM中的不同消费者和不同生产者在启动时获取到的MQClientInstane实例都是同一个。
一个Topic下可以配置多个消息队列,以提高服务吞吐量,生产者拿到路由信息后,需要确定发送的队列,通过过滤数据得到Master角色且具有写权限的队列,使用轮询的方式,用自增1的值对队列大小取模,确认要发送的队列,让消息平均落在不同的消息队列上。返回的消息队列按照broker、序号排序,格式如下:
[{"broker-Name":"broker-a","queueId":0},{"brokerName":"broker-a","queueId":1},{"brokerName":"broker-a","queueId":2},{"brokerName":"broker-a","queueId":3},{"brokerName":"broker-b","queueId":0},{"brokerName":"broker-b","queueId":1},{"brokerName":"broker-b","queueId":2},{"brokerName":"broker-b","queueId":3}]
选择消息队列算法:Math.abs(index ) % 消息队列大小。
消息发送如何实现高可用消息发送高可用主要通过两个手段:消息重试与Broker规避。
规避有故障的Broker
如果上一次根据路由算法选择的是宕机的Broker的第一个队列,那么随后的下次选择的是宕机Broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗。此时可以将该Broker进行规避,不再选择该Broker,提高发送消息的成功率。
批量消息发送如何实现一致性批量消息发送是将同一主题的多条消息封装成MessageBatch对象,一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。服务端按照同样的结构进行解析即可。
3.5 消息存储RocketMQ主要存储的文件包括CommitLog文件、ConsumeQueue文件、IndexFile文件。
CommitLog文件- 消息存储文件,所有主题的消息都存储在同一个CommitLog文件中。
- 存储到CommitLog文件中是串行的,顺序写文件,采用尾部追加的方式写入,可以尽最大的能力确保消息发送的高性能与高吞吐量。
- 单个CommitLog文件的大小为1G,一个文件写满后会再创建另外一个。以文件中第一个消息的物理偏移量为文件名,文件名长度20位,小于20位用0补齐。文件名示例如下:
CommitLog文件
- CommitLog文件每条消息中存储了消体内容、topic名称内容、queueId、消息大小、消息的存储时间、消息被某个订阅组重新消费了多少次、消息产生端的地址、消息的存储时间等内容
- 由于同一主题的消息不连续地存储在CommitLog文件中,如果消费者直接从CommitLog文件中查找某一主题下的消息,效率将极其低下。RocketMQ设计了消息消费队列文件(Consumequeue)和IndexFile索引文件。当消息到达CommitLog文件后,由专门的线程异步转发,从而构建ConsumeQueue文件与IndexFile文件。
- 消费队列文件,是CommitLog中每个消息的索引文件,定位了当前这条消息在CommitLog中的位置。该文件由在CommitLog中的起始物理偏移量offset、消息大小size和消息Tag的HashCode值组成,文件采取定长设计,每一个条目共20个字节,格式如下:
ConsumeQueue文件
- 查找消息时,必须先从ConsumeQueue中去获取消息存储的物理地址,然后再从CommitLog中将数据取出。
- 每个Topic包含多个消息队列,每一个消息队列有一个ConsumeQueue文件。单个文件由30W个条目组成,目的是为了可以像数组一样随机访问每一个条目。
- 过滤tag也是通过遍历ConsumeQueue来实现的(先比较hash(tag)符合条件的再到consumer比较tag原文)
消息索引文件,主要存储消息Key与Offset的对应关系。生产者发送的消息包含key值,会用IndexFile存储消息索引,主要用于使用key或时间戳来查询消息。
存储文件
3.6 消息消费消息消费有两种模式:广播模式与集群模式。
- 集群模式:每条消息被消费者消费一次,服务端保存消费进度。
- 广播模式:同一消费者组内的每个消费者,都消费到Topic的所有消息。RocketMQ并不会对消费失败的消息进行失败重投,由消费者保存消息消费进度。
消费者消费消息,是主动从服务端获取消息,通过“长轮询”方式达到Push效果的方法。消费者从Broker查询消息,当Broker服务端接到请求后,如果队列里没有新消息,并不立刻返回,而是通过循环HOLD住客户端一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer,如果没有消息则返回空结果。
消息确认如果消息监听器返回的消费结果为RECONSUME_LATER,则需要将这些消息发送给Broker延迟消息。如果发送ACK消息失败,将延迟5s后提交线程池进行消费。
消息过滤消费者使用tag对消息进行过滤。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。
参考书籍《RocketMQ技术内幕:RocketMQ架构设计与实现原理》
《RocketMQ实战与原理解析》
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com