Message-Queue

消息队列

Posted by Elvis on June 1, 2021

消息队列的作用

其实MQ 的场景有很多,但是比较核心的有 3 个:异步处理、削峰填谷、应用解耦。

异步处理

用户注册后,需要发送注册邮件和注册短信。假设注册信息加入数据库需要30ms,发送注册邮件需要40ms。发送注册短信需要50ms。总共需要的时间就是30+40+50=120ms。可能用户会感觉太慢了

但是一旦加入 MQ 之后,系统只需要将客户信息放入数据库就可以直接返回给用户注册成功的信息。然后用MQ异步处理短信和邮箱的验证。而短信和邮箱的验证因为网络问题。用户是可以接受一定时间的延迟的。那么算下来用户感知到这个接口的耗时仅仅是30ms,节约了2/3的时间。用户体验那是倍儿爽。

削峰填谷

秒杀在我们的日常中相当常见。在秒杀的过程中,系统都发生了什么呢?假设我们的数据库每秒能处理最大的数据量是100条。但是在活动秒杀的时候,数据量激增到每秒一万条。这样一来服务器不堪重负就会gg掉。所以就要考虑优化我们的架构,而MQ正是解决办法之一!具体办法就是将秒杀的信心和数据放入消息队列。系统按照之前的处理速度来从容的处理这些数据。而不是直接将全部数据涌入系统,避免宕机问题的发生。

降低系统之间的耦合度

系统A下面有两个子系统,一个 B 一个 C。这两个子系统都高度依赖于系统A,如果现在要增加一个服务D依赖于A,那么B、C又要做回归测试,常见的用户场景有公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

引入MQ后要考虑的问题

  • 系统可用性降低
  • 系统复杂性增加
  • 数据一致性问题

前两个是引入任何一个外部中间件都会遇到的问题,那么最后一个是怎么解决的呢?

怎么保证 MQ 一致性

消息生产失败

一般来说,从生产者到MQ中间件是通过网络调用的,是网络调用就有可能存在失败。消息队列通常使用确认机制,来保证消息可靠传递:当你代码调用发送消息的方法,消息队列的客户端会把消息发送到Broker,Broker接受到消息会返回客户端一个确认。只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送的确认响应后,会自动重试,如果重试再失败,就会一返回值或者异常方式返回给客户端。所以在编写发送消息的代码,需要正确处理消息发送返回值或者异常,保证这个阶段消息不丢失。

举个kafka的例子来说明:

image-20210601203007951

  1. 根据key和topic获取leader的分区(partition)
  2. producer将massage写入leader对应的partition
  3. leader将信息写到缓冲区中,异步写入磁盘
  4. follower批量从leaderpull数据同步
  5. follower将消息写入本地log之后向leader发送ACK
  6. leader收到所有的副本的ACK向producer发送ACK

当然这里的ACK的可靠性要做到上面的第五还是第六部可以通过request.required.acks参数来设置数据的可靠性。

image-20210601204236436

MQ处理存储失败

消息到达消息中间件之后,通常是会被存储起来的,只有被写入到磁盘中,消息才是真正地被存储,不会丢失。但是,大部分MQ中间件并不是收到消息就立马写入磁盘的,只是由于磁盘的写入速度相对于内存,现得慢得多得多,所以,像Kafka这样的消息系统,是会把消息写到缓冲区中,异步写入磁盘,如果机器在中途突然断电,是有可能会丢失消息的。

为了解决这个问题,大部分的MQ都是采用分布式部署,消息会在多台机器上写入缓存中成功才会返回给业务方成功,由于多台机器同时断电的可能性较低,我们可以认为这是比较低成本又可靠的方案。

消费者处理失败

一般的MQ都有MQ重试机制,如果处理失败,就会尝试重复消费这个MQ。这个带来的问题就是,MQ可能已经成功消费了,但是在通知MQ中间件的时候失败了,这个时候带来的结果就是消息重复消费。同理,在生产者重试的时候,也会遇到消息重复消费的问题。这个时候,就要求我们尽量把接口设计得有幂等性,这个时候即便是重复消费,也不用担心什么问题了。

例如在做异步创建账号的时候先对于数据库的account做有效查询来保证等幂创建的有效性

怎样保证MQ的高可用性

RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ有三种模式:单机模式、普通集群模式、镜像集群模式。

1.单机模式就是 demo 级别的,就是说只有一台机器部署了一个 RabbitMQ 程序。这个会存在单点问题,宕机就玩完了,没什么高可用性可言。一般就是你本地启动了玩玩儿的,没人生产用单机模式。

2.普通集群模式,意思是多台机器启动多个RabbitMQ实例,每个机器启动一个。你创建的queue,只会放在一个RabbitMQ实例上。但是每个实例会同步queue的元数据(可以理解queue的配置信息)。即使你消费的时候连接到了另一个实例,那么那个实例会从queue所在实例上拉取数据过来

其实并没有做到所谓的分布式,还是普通集群。因为会导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个queue所在实例消费数据。前者有数据拉取的开销,后者会导致单实例性能瓶颈。

如果放queue的实例宕机了,则会导致其他实例都没有办法进行拉取。如果你开启了消息持久化,让RabbitMQ落地存储消息的话,消息不一定会丢。得等这个实例恢复了,然后才可以继续从这个queue拉取数据。

3.镜像集群模式才是所谓的rabbitmq的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。

这样的话,好处在于你任何一个机器宕机了,没事儿,别的机器都可以用。

坏处在于这个性能开销也太大了吧,消息同步所有机器,导致网络带宽压力和消耗很重!

Kafka

消息队列通信的模式

(1)点对点模式

image-20210601204236436 如上图所示,点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。

(2)发布订阅模式

在这里插入图片描述 如上图所示,发布订阅模式是一个基于消息送的消息传送模型,改模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(类似微信公众号)。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是consumer1、consumer2、consumer3由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是8M/s、5M/s、2M/s,如果队列推送的速度为5M/s,则consumer3无法承受!如果队列推送的速度为2M/s,则consumer1、consumer2会出现资源的极大浪费!

Kafka的架构原理

上面简单的介绍了为什么需要消息队列以及消息队列通信的两种模式,下面主角介绍Kafka。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力。。

(1)基础架构与名词解释

在这里插入图片描述

  • Producer:Producer即生产者,消息的产生者,是消息的入口。
  • Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
  • Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
  • Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
  • Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  • Message:每一条发送的消息主体。
  • Consumer:消费者,即消息的消费方,是消息的出口。
  • Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
  • Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

(2)工作流程分析

(1)发送数据

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图: 在这里插入图片描述 发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下: 在这里插入图片描述 上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

  1. 方便扩展:因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
  2. 提高并发:以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

  1. partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
  2. 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
  3. 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all

  • 0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
  • 1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
  • all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

(2)保存数据

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

(1)Partition 结构

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。 在这里插入图片描述 如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

(2)Message结构

上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

  • offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
  • 消息大小:消息大小占用4byte,用于描述消息的大小。
  • 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
(3)存储策略

无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?

  • 基于时间,默认配置是168小时(7天)。
  • 基于大小,默认配置是1073741824。

需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

(3)消费数据

消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!我们看下图: 在这里插入图片描述 图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致

在保存数据的小节里面,我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?假如现在需要查找一个offset为368801的message是什么样的过程呢?我们先看看下面的图: 在这里插入图片描述

  1. 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
  2. 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
  3. 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。

这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。

消费消息基本流程

消息队列Kafka版订阅者在订阅消息时的基本流程是:

  1. Poll数据。
  2. 执行消费逻辑。
  3. 再次poll数据。

负载均衡

每个Consumer Group可以包含多个消费实例,即可以启动多个消息队列Kafka版Consumer,并把参数group.id设置成相同的值。属于同一个Consumer Group的消费实例会负载消费订阅的Topic。

例如Consumer Group A订阅了Topic A,并开启三个消费实例C1、C2、C3,则发送到Topic A的每条消息最终只会传给C1、C2、C3的某一个。消息队列Kafka版默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。

消息队列Kafka版负载消费的内部原理是,把订阅的Topic的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。

分区个数

分区个数主要影响的是消费者的并发数量。

对于同一个Consumer Group内的消费者来说,一个分区最多只能被一个消费者消费。因此,消费实例的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态。

一般来说,不建议分区数小于12,否则可能影响消费发送性能;也不建议超过100个,否则易引发消费端Rebalance。

控制台的默认分区个数是12,可以满足绝大部分场景的需求。您可以根据业务使用量进行增加。

注意 分区增加后,将不能减少,请小幅度调整。

多个订阅

消息队列Kafka版支持以下多个订阅方式:

  • Consumer Group订阅多个Topic。

    一个Consumer Group可以订阅多个Topic,多个Topic的消息被Cosumer Group中的Consumer均匀消费。例如Consumer Group A订阅了Topic A、Topic B、Topic C,则这三个Topic中的消息,被Consumer Group中的Consumer均匀消费。

    Consumer Group订阅多个Topic的示例代码如下:

    1
    2
    3
    4
    5
    6
    
    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic: topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
    
  • Topic被多个Consumer Group订阅。

    一个Topic可以被多个Consumer Group订阅,且各个Consumer Group独立消费Topic下的所有消息。例如Consumer Group A订阅了Topic A,Consumer Group B也订阅了Topic A,则发送到Topic A的每条消息,不仅会传一份给Consumer Group A的消费实例,也会传一份给Consumer Group B的消费实例,且这两个过程相互独立,相互没有任何影响。

一个Consumer Group对应一个应用

建议一个Consumer Group对应一个应用,即不同的应用对应不同的代码。如果您需要将不同的代码写在同一个应用中,请准备多份不同的kafka.properties。例如kafka1.properties、kafka2.properties。

消费位点

每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset。

消息队列Kafka版Consumer会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为消费位点ConsumerOffset。

剩余的未消费的条数(也称为消息堆积量)=MaxOffset-ConsumerOffset。

消费位点提交

消息队列Kafka版消费者有两个相关参数:

  • enable.auto.commit:默认值为true。
  • auto.commit.interval.ms: 默认值为1000,即1s。

这两个参数组合的结果就是,每次poll数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数auto.commit.interval.ms规定的时长,则客户端会启动位点提交动作。

因此,如果将enable.auto.commit设置为true,则需要在每次poll数据时,确保前一次poll出来的数据已经消费完毕,否则可能导致位点跳跃。

如果想自己控制位点提交,请把enable.auto.commit设为false,并调用commit(offsets)函数自行控制位点提交。

消费位点重置

以下两种情况,会发生消费位点重置:

  • 当服务端不存在曾经提交过的位点时(例如客户端第一次上线)。
  • 当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。

Java客户端可以通过auto.offset.reset来配置重置策略,主要有三种策略:

  • latest:从最大位点开始消费。
  • earliest:从最小位点开始消费。
  • none:不做任何操作,即不重置。

说明

  • 建议设置成latest,而不要设置成earliest,避免因位点非法时从头开始消费,从而造成大量重复。
  • 如果是您自己管理位点,可以设置成none。

拉取大消息

消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时,需要注意控制拉取速度,注意修改配置:

  • max.poll.records:如果单条消息超过1 MB,建议设置为1。
  • fetch.max.bytes:设置比单条消息的大小略大一点。
  • max.partition.fetch.bytes:设置比单条消息的大小略大一点。

拉取大消息的核心是逐条拉取的。

拉取公网

通过公网消费消息时,通常会因为公网带宽的限制导致连接被断开,此时需要注意控制拉取速度,修改配置:

  1. fetch.max.bytes:建议设置成公网带宽的一半(注意该参数的单位是bytes,公网带宽的单位是bits)
  2. max.partition.fetch.bytes:建议设置成fetch.max.bytes的三分之一或者四分之一。

消息重复和消费幂等

消息队列Kafka版消费的语义是at least once, 也就是至少投递一次,保证消息不丢失,但是无法保证消息不重复。在出现网络问题、客户端重启时均有可能造成少量重复消息,此时应用消费端如果对消息重复比较敏感(例如订单交易类),则应该做消息幂等。

以数据库类应用为例,常用做法是:

  • 发送消息时,传入key作为唯一流水号ID。
  • 消费消息时,判断key是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次。

当然,如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。

消费失败

消息队列Kafka版是按分区逐条消息顺序向前推进消费的,如果消费端拿到某条消息后执行消费逻辑失败,例如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,那么有以下两种处理方式:

  • 失败后一直尝试再次执行消费逻辑。这种方式有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。
  • 由于消息队列Kafka版没有处理失败消息的设计,实践中通常会打印失败的消息或者存储到某个服务(例如创建一个Topic专门用来放失败的消息),然后定时检查失败消息的情况,分析失败原因,根据情况处理。

消费延迟

消息队列Kafka版的消费机制是由客户端主动去服务端拉取消息进行消费的。因此,一般来说,如果客户端能够及时消费,则不会产生较大延迟。如果产生了较大延迟,请先关注是否有堆积,并注意提高消费速度。

消费阻塞以及堆积

消费端最常见的问题就是消费堆积,最常造成堆积的原因是:

  • 消费速度跟不上生产速度,此时应该提高消费速度,详情请参见提高消费速度
  • 消费端产生了阻塞。

消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。

消费端应该竭力避免堵塞消费线程,如果存在等待调用结果的情况,建议设置等待的超时时间,超时后作为消费失败进行处理。

提高消费速度

提高消费速度有以下两个办法:

  • 增加Consumer实例个数。

    可以在进程内直接增加(需要保证每个实例对应一个线程,否则没有太大意义),也可以部署多个消费实例进程;需要注意的是,实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作。

  • 增加消费线程。

    增加Consumer实例本质上也是增加线程的方式来提升速度,因此更加重要的性能提升方式是增加消费线程,最基本的步骤如下:

    1. 定义一个线程池。
    2. Poll数据。
    3. 把数据提交到线程池进行并发处理。
    4. 等并发结果返回成功后,再次poll数据执行。

消息过滤

消息队列Kafka版自身没有消息过滤的语义。实践中可以采取以下两个办法:

  • 如果过滤的种类不多,可以采取多个Topic的方式达到过滤的目的。
  • 如果过滤的种类多,则最好在客户端业务层面自行过滤。

实践中请根据业务具体情况进行选择,也可以综合运用上面两种办法。

消息广播

消息队列Kafka版没有消息广播的语义,可以通过创建不同的Consumer Group来模拟实现。

订阅关系

同一个Consumer Group内,各个消费实例订阅的Topic最好保持一致,避免给排查问题带来干扰。