RocketMQ
阅读提示
建议先看题目目录,再按“概念 -> 原理 -> 场景 -> 优化”顺序复习。
每题先讲结论,再补关键机制和项目实践,回答会更稳。
1、MQ的作用是什么呢?
- 因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了mq
2、RocketMQ由哪些角色组成,每个角色作用和特点是什么?
3、RocketMQ Broker中的消息被消费后会立即删除吗?(了解)
- 不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
4、RocketMQ消费模式有几种?
- 集群消费:
- 一条消息只会被同Group中的一个Consumer消费
- 多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
- 广播消费:
- 消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer
- 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一 次。
5、如何保证消息不丢失(了解)
消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段。所以要从这三个阶段考虑:
生产
在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。
异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
如果发生超时的情况,也可以通过查询日志的API,来检查是否在Broker存储成功。
存储
存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。
消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
Broker通过主从模式来保证高可用,Broker支持Master和Slave同步复制、Master和Slave异步复制模式,生产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。
消费
从Consumer角度分析,如何保证消息被成功消费?
Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。
6、如何防止消息重复消费(了解)
- 对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等和消息去重。
- 业务幂等:第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。
- 消息去重:第二种是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个唯一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
- 具体做法是可以建立一个消费记录表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
7、如何让RocketMQ保证消息的顺序消费?
- 首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:
- 同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
8、Broker把自己的信息注册到哪个NameServer上?
- 这么问明显在坑你,因为Broker会向所有的NameServer上注册自己的信息,而不是某一个,是每一个,全部
9、RocketMq的工作流程是怎样的?(了解)
- 首先启动NameServer。NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来
- 启动Broker。启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系
- 创建Topic。创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
- Producer发送消息。启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送
- Consumer消费消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费
10、RocketMQ是如何实现定时消息的? (了解)
- 定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。 其实定时消息实现原理比较简单,如果一个topic对应的消息在发送端被设置为定时消息,那么会将该消息先存放在topic为SCHEDULE_TOPIC_XXXX的消息队列中,并将原始消息的信息存放在commitLog文件中,由于topic为SCHEDULE_TOPIC_XXXX,所以该消息不会被立即消息,然后通过定时扫描的方式,将到达延迟时间的消息,转换为正确的消息,发送到相应的队列进行消费。
11、说说RocketMQ怎么对文件进行读写的?(了解)
RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache、顺序读写、零拷贝。
PageCache、顺序读取
在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
零拷贝
另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
说说什么是零拷贝?
在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换。
从磁盘复制数据到内核态内存;
从内核态内存复制到用户态内存;
然后从用户态内存复制到网络驱动的内核态内存;
最后是从网络驱动的内核态内存复制到网卡中进行传输。
所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的。
12、RocketMQ消息长轮询了解吗?(了解)
所谓的长轮询,就是Consumer 拉取消息,如果对应的 Queue 如果没有数据,Broker 不会立即返回,而是把 PullReuqest hold起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。
PullMessageProcessor#processRequest
暂时无法在飞书文档外展示此内容
挂起的请求,有一个服务线程会不停地检查,看queue中是否有数据,或者超时。
PullRequestHoldService#run()
暂时无法在飞书文档外展示此内容
