本文为学习 Beautiful Java关于消息中间件的系列文章后的读书笔记。

消息中间件

前言

消息中间件在本质上就是一种很简单的数据结构—队列,但是一条队列肯定是无法成为消息中间件的,你必须要考虑性能、容灾、可靠性等等因素。现在将从队列开始,演示一条队列是如何进化为一个靠谱的中间件的。

为什么要使用消息中间件

假如我们在淘宝下了一个订单,那么淘宝后台需要做一些事情:

  1. 通知商家, 你有一笔新的订单,请及时发货。
  2. 更新用户画像,重新给用户推荐他可能感兴趣的商品
  3. 更新用户的积分和等级等等信息
  4. 。。。。。还有很多

于是,创建一个订单的函数,至少需要调取三个其他服务的接口,写成伪码:

void createOrder(...){
	DoCreateOrder(...);
	// 调用其他服务接口
	sendMsg(...);
	UpdateUserInterestedGoods(...);
	undateMemberCreditInfo(...);
}

这种做法至少存在两个问题:

  1. 过度耦合:如果后面创建订单的时候,需要触发新的动作,那就得改代码,在原有的创建订单函数后面,追加一行代码。

  2. 缺少缓冲:如果创建订单的时候,会员系统恰好处于非常忙碌或者宕机的状态,那这个时候更新会员信息就会失效,我们需要一个地方,来暂时存放无法消费的消息。

Nsq 1.0—我是一条队列

我们可以在订单系统和其他系统的中间,引入一个消息中间件,或者说,引入一条队列。当订单系统创建完订单的时候,需要往队列中塞入一条topic为“order_created“的消息。接下来,nsg1.0,会将这条消息,再推送给所有订阅了这个topic的消息的机器,告诉他们,”有新的订单了,你们该干啥干啥”。

这样一个简单的队列,就解决了上面的两个问题:

  1. 解耦:如果后面又新的动作,需要在创建订单后执行,那么只需要让新同学自己去订阅topic为“order_created”的消息即可。
  2. 缓冲:如果某个系统很忙,没空处理消息,那么只需跟nsq说,”我很忙,不要再发消息过来了“,那么nsq就不会给它推送消息,或者系统出现故障,消息虽然推送过去了,但是它处理失败,那么只需要给nsq回复一个”requeue“的命令,这样nsq就会把消息重新放入队列,进行重试。

Nsq2.0 —channel

作为一个靠谱的中间件,你必须做到:高效、可靠、方便。

上面使用一个简单的队列来实现的消息中间件,肯定是不满足这三点的。

首先 ,假设我们会员系统,部署了三台实例,他们都订阅了topic为”order_created“的消息,那么一旦有订单创建,这三台实例都会收到消息,并且去更新会员积分信息,而其实我只需要更新一次就可以了。

这就涉及到一个消费组的概念。消费者组是Kafka里提到的,在Nsq,对应的术语是channel。

会员系统的三个实例,当他们收到消息的时候,要做的事情是一样的,并且只需要有一个实例执行,那么它们就是一个消费者组里面的,要标识为同一个channel,比如说叫”update_member_credit”的channel,而短信系统、推荐系统,也要有自己的channel。

当nsg收到消息的时候,会给每个channel复制一份消息,然后channel再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推送给谁呢?这就涉及到负载均衡。

Nsq3.0—关于nsqlookupd

上面讲到,nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应的topic和channel的消费者。

那么,nsq怎么知道哪些消费者订阅了topic为”order_created“的消息呢?总不能写死在配置文件中吧?

因此,我们需要一个类似于微服务里的注册中心的模块,来实现服务发现的功能,这就是nsqlookupd。

nsqlookupd是一个负责管理拓扑信息的守护进程。客户端通过查询nsqlookupd来发现指定话题 (topic)的生产者,并且nsq节点广播话题(topic)和通道消息(channel)。

nsqlookupd提供了一个loolup接口,比如你想知道那些nsq上面有topic为test的消息,那么只需要调用一下 curl ‘http://127.0.0.1:4161/lookup?topic=test’,nsqlookupd就会给你返回对应的topic的nsq列表。接着消费者只需要遍历返回的json串中的producers列表,将address和port拼起来,就可以拿到要建立链接的url地址。

这个过程,可以在nsq消费者客户端实现中看到。而且这个过程不会在消费着启动的时候才执行,而是定期去执行,不断获取最新的nsq列表。

  1. 先调用lookup接口,获取拥有的topic的nsq列表。

  2. 便利nsqlookupd中的列表,然后把所有lookup的返回结果,进行合并。

  3. 接着和旧的nsq列表进行比较,进行删除和新增,保证本地的nsq列表数据是最新的。

Nsq4.0—nsqd集群

作为一个靠谱的中间件,你必须支持集群部署,这样才能实现可靠、高效。

一条消息如何从生产到被消费的

首先,我们需要启动各种服务,生产者、消费者、nsq、nsqlookupd。

  1. 假设消费者最先启动,它要消费topic为“order_created”的消息,这个时候它向nsqlookupd调用lookup接口,试图获取对应topic为nsq。

    由于nsqlookupd还没有启动,因此获取失败,不过不影响消费者的启动流程,因为它每隔一段时间,去重新尝试拉取新的数据。

  2. 接着 ,我们启动nsq和nsqlookupd,这下消费者可以调通nsqlookupd的接口了,不过由于nsq上海没有任何的topic,因此lookup接口返回的信息都是空的,因此消费者仍然无法向nsq订阅消息。

  3. 然后我们调用命令,在nas上创建新的topic:

​ curl -X POST 。。。。。。

  1. nsq创建topic之后,会自动向nsqlookup注册新的topic节点。当消费者下次过来nsqlookupd调用lookup接口的时候,接口会告诉它,已经有一台nsq了。于是,消费者便可以拿到对应nsq的ip和端口,和它建立连接,向他发送sub命令,带上topic和channel参数,订阅nsq上面的order_created消息。

  2. 然后我们启动生产者,生产者向nsq发布topic为order_created的消息。消息发布给nsq之后,就像之前讲的,nsq会把消息复制到topic下所有的channel中,每个channel复制一份,接着channel再向和它建立连接的其中一个消费者实例,推送这个消息。

    如果有多个不同channel的消费者,多个消费者都会收到消息,

    如果启动的是两个相同的消费者,那么只有一个消费者能够收到。nsq默认的负载均衡策略是轮询。

关于Nsq的一些细节

消息至少被投递一次

在Nsq中的Guarantees就提到过:

messages are delivered at least once

消息投递策略,是消息中间件的特有属性,不同的消息中间件,对投递策略的支持也有不同。比如KafKa,就支持最多一次(At most once)、至少一次(At lease once)、准确一次(Excatly once)三种策略,而Nsq,则只支持最常见的一种,也就是至少一次。

那么,Nsq是如何保证消息至少被投递一次呢?

  1. 如果消费者收到消息,并成功执行,那么就给nsq返回FIN,代表消息已经被成功执行,这个时候Nsq就可以将内存中也就是channel中消息干掉。

  2. 如果消费者处理消息时候发生了异常,需要重试,就给Nsq返回REQ,代表requeue,重新进入队列的意思,Nsq就会将消息重新放入队列中,再次推送给消费者。如果消费者迟迟没有给nsq回响应,超过了最大等待时间,那么Nsq也会将消息reueue。

    所以,消费者必须保持对消息操作的幂等性。

重试次数

Nsq推送过来的消息中,有一个attempts字段,代表尝试的次数,一开始是1,每次客户端给Nsq会REQ响应后,Nsq再次推送过来的消息,attempts都会加1,消费者可以按照自己的需要,对重试次数进行限制,比如希望最多尝试6次,那么就在消费者的处理逻辑中,判断attempts <= 6,超过6了则打印日志或者其他处理,然后直接返回FIN,告诉Nsq不要再重试了。

消息无序性

消息是否有序,是消息中间件的特有属性。通过上面的分析,很明显,Nsq的消息是无序的,这也再Nsq的官网的Guarantees中有提到:

messages received are un-ordered

比如说channel A里面现在有两条消息,M1和M2,M1先产生,M2后产生,channel A分别将M1和M2推送给消费者C1和C2,那么有可能C1比C2先处理完消息,也可能会反之。

正是由于这种一有消息就推送的策略,Nsq中的消息处理是无序的。

push or pull

push 还是 pull,这也就是再设计一个消息中间件的时候,必须要考虑的问题。Kafka用的是pull,而Nsq采用的是push。

相对于pull,push的好处不言而喻——消息处理更加实时,一旦消息过来,立即推送出去,而pull则有不确定性,你不知道消费者什么时候有空过来pull,因此做不到实时的消息处理。

同时,也正是因为采用了push,Nsq选择将消息放到内存中,只有当队列中消息的数量超过了 –mem-queue-size配置的限制的时候,才会对消息进行持久化,将消息保存到磁盘中。

当然,push机制也是有缺陷的,那就是当消费者很忙的时候,你还是一直给它push,那只会逼良为娼,所以采用push的消息中间件,必须进行流控。

流控

Nsq流控的方式非常简单,

  1. 当消费者和Nsq建立好连接,准备接受消息时,会给Nsq回一个RDY的响应,同时带上一个rdy_count,代表准备接受消息的数量,于是Nsq会给消费者推送消息,每推送一条,对应连接的rdy_count就减1,知道连接的rdy_count变成0,则不再继续推送消息。
  2. 当消费者觉得自己可以接受很多消息的时候,只需要再发一次RDY命令,就可以更新连接的rdy_count,nsq就会继续给消费者推送消息。