NSQ 架构

NSQ 简介

NSQ 是一个基于 Go 语言的分布式实时消息处理平台,它基于 MIT 开源协议发布,由 bitly 公司开源出来的一款简单易用的消息中间件。其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

基本组件

nsq 有三个必要的组件:nsqd、nsqlookupd、nsqadmin。

nsqd

一个负责接收、排队、转发消息到客户端的守护进程。负责message的收发和队列的维护。 nsqd默认监听一个TCP端口(4150)和一个HTTP端口 (4151)。

nsqd功能特性:

  • topic发布到channel中的消息会被消费者消费
  • 只要channel存在,即使没有消费者,生产者的message也会缓存在队列中
  • topic发布的message至少会被消费一次,nsq正常退出,消息会被写入磁盘
  • 限制内存中用,channel中的message是存放在内存中的,可以限制其大小,超出时可以将 message缓存到磁盘
  • topic和channel一旦创建就会一直存在,除非主动删除

nsqd 可以多机器部署,当客户端向指定 topic 发送消息时,可以配置多个 nsqd 地址,消息会随机的分配到各个 nsqd 上,nsqd 优先把消息存储到内存 channel 中,当内存 channel 满了之后,则把消息写到磁盘文件中。

指标Connections: 等于 nsqd 数量 * 监听 topic 服务数量

nsqlookupd

中心管理服务进程。nsqlookupd提供管理 nsqd 节点服务,客户端查询生产者、topic和channel服务。 使用TCP(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。

  • 一个 nsqd服务只能注册到一个nsqlookupd
  • 去中心化,nsqlookupd崩溃也不会影响到nsqd
  • 可以充当nsqd和nsqdadmin之间的交互中间件
  • 消费者可以通过nsqlookupd获取生产者信息

nsqadmin

一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务。默认访问端口是4171。

  • 查看和管理topic和channel
  • 查看message数量和状态

nsqadmin 访问地址: http://0.0.0.0:4171/

参数解读:

  • depth: 表示在这个channel中未被消费的消息总数

核心概念

message『类似数据库中的记录』

数据流形式,生产者需要消费者处理的一个数据流(包)。

消息,生产者与消费者之间传递的数据,在 nsq 中统一称为 message。

topic『类似数据库中的表』

消息的逻辑关键词。

  • message属于某个特定的topic
  • 在由生产者在发布消息时生成
  • 通常描述消息的内容

channel

生产者与消费者之间的消息通道,相当于消息队列。

  • 一个topic可以有多个channel
  • 一个channel只能对应一个topic
  • channel由消费者首次订阅产生,一个channel可以有多个消费者,起负载均衡的作用。
  • 一个topic的消息会复制到与topic关联的各个channel
  • channel的名称通常描述的是消费者的业务

消费者在消费 message 的时候,需要指定 topic 和 channel。channel 会在消费者第一次订阅相应 topic 的时候就创建。

channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。

  • 同一 topic、同一 channel 下的多个消费者,不会消费到同一 message。可以理解为多个消费者之间进行了负载均衡。
  • 同一 topic、不同 channel 下的两个消费者,可以消费到一模一样的 message。 多个消费者之间相互独立,完全没有影响。

Message、Topic、Channel

nsq topic、channel、和消费客户端的结构如下图(一个通道通常可以连接多个客户端。 假设所有连接的客户端都处于准备接收消息的状态,则每条消息都将传递给随机客户端)

消息传递过程

消息的生命周期

生产者发布一个消息之后,通过以下途径保证消息可靠性:

  • client连接上nsqd,并通过RDY参数告知可以处理的message数量
  • nsqd 把消息暂存起来或者推送到各个连接的channels,对于状态为REQ 和 timeout的都会被暂存起来
  • 客户端消费消息之后,回复nsqd一个FIN表示消息成功处理,或者REQ表示表示这个消息将会继续重新加入到队列中, 如果处理时间超过配置的timeout时间,则nsqd会把这个消息当成超时处理
  • 如果客户端一直没有回复直到超时,则这个消息会被当成超时消息重新加入到队列中

NSQ 运行流程

  1. NSQ 推荐通过 nsqd 实例使用协同定位 producer,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个 consumer 读取。更重要的是, producer 不必去发现其他的 nsqd 节点,他们总是可以向本地 nsqd 实例发布消息。

  2. 一个 producer 向它的本地 nsqd 发送消息,要做到这点,首先要先打开一个连接( NSQ 提供 HTTP API 和 TCP 客户端 等2种方式连接到 nsqd),然后发送一个包含 topic 和消息主体的发布命令(pub/mpub/publish),在这种情况下,我们将消息发布到 topic 上,消息会采用「多播」的方式被拷贝到各个 channel 中, 然后通过多个 channel 以分散到我们不同需求的 consumer 中。

  3. channel 起到队列的作用。 多个 producer 产生的 topic 消息在每一个连接 topic 的 channel 上进行排队。

  4. 每个 channel 的消息都会进行排队,直到一个 consumer 把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。 nsqd节点首先会向 nsqlookup 广播他们的位置信息,一旦它们注册成功, consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic 的 nsqd 节点。

  5. 每个 consumer 向每个 nsqd 主机进行订阅操作,用于表明 consumer 已经准备好接受消息了。这里我们不需要一个完整的连通图,但我们必须要保证每个单独的 nsqd 实例拥有足够的消费者去消费它们的消息,否则 channel 会被队列堆着。

nsq-lookup

Quick Start

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
brew install nsq

# 1. nsqlookupd 启动
nsqlookupd

# 2. nsqd 启动
nsqd --lookupd-tcp-address=127.0.0.1:4160

# 3. nsqadmin 启动
nsqadmin --lookupd-http-address=127.0.0.1:4161

# 4. 发布初始化消息
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'

# 5. 客户端启动,连接 channel「nsq_to_file」
nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

# 6. 发送更多消息
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'

常见案例

  1. 负载均衡消费

    核心点:将 channel 设置为固定值(如 1),生产者往一个 topic 发消息时,只有一个消费者能消费消息。

    负载均衡流程

  2. 通过 channel/topic 实现广播、发布订阅功能

    核心点:将 channel 设置为服务器本地ip,生产者往一个 topic 发消息时,topic 下多个 channel 会同时消费同一个消息。

    广播流程

  3. 客户端对消息的处理和响应

    在服务端发送消息给客户端后,如果在处理业务逻辑时,如果发生错误则给服务器发送Requeue命令告诉服务器,重新发送消息进处理。如果处理成功,则发送Finish命令。

    服务端收到命令后,对飞翔中的消息进行处理,如果成功则去掉,如果是Requeue则执行归队和重发操作,或者进行defer队列处理。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    
    func (r *Consumer) handlerLoop(handler Handler) {
        r.log(LogLevelDebug, "starting Handler")
    
        for {
            message, ok := <-r.incomingMessages
            if !ok {
                goto exit
            }
    
            if r.shouldFailMessage(message, handler) {
                message.Finish()
                continue
            }
    
            err := handler.HandleMessage(message)
            if err != nil {
                r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
                if !message.IsAutoResponseDisabled() {
                    message.Requeue(-1) // 重试
                }
                continue
            }
    
            if !message.IsAutoResponseDisabled() {
                message.Finish()
            }
        }
    
    exit:
        r.log(LogLevelDebug, "stopping Handler")
        if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
            r.exit()
        }
    }
    
  4. Timeout

    每一条消息都必须在一定时间内向 nsq 做出响应,否则 nsq 会认为这条消息超时,然后 requeue 处理。

    配置项 -msg-timeout :单条消息的超时时间,默认一分钟,即消息投递后一分钟内未收到响应,则 nsq 会将这条消息 requeue 处理。

    配置值 -max-msg-timeout :nsqd 全局设置的最大超时时间,默认 15 分钟

    超时的判定时长将取决于以上两个配置的最小值。

常见报错

  1. (12f351b237d1:4150) error connecting to nsqd - dial tcp: lookup 12f351b237d1: no such host

    (有2种消费者的写法,第一种是直连nsqd(tcp长连接),第二种是通过nsqlookupd的http接口查询后长连接到nsqd, 显然第二种更易于分布式容错和高可用。)

    本地开发环境是用 docker 部署的,nsqd和nsqlookupd是两个容器,consumer.ConnectToNSQLookupd()方法内部也会调用consumer.ConnectToNSQD()方法,

    docker 容器间是用 container_id 互相通讯,故需要配置 nsqd 的 container_id 到 /etc/hosts,就能解决此问题。

    1
    2
    
    # /etc/hosts 示例
    127.0.0.1 12f351b237d1
    
  2. error connecting to nsqd - dial tcp: i/o timeout

    TODO 原因未知

代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// HandleMessage方法 return nil 或 手动执行msg.Finish,都会将当前消息移除队列,nsq才会消费下一个队列消息

msg.Finish() // 手动提交消费完成,移出队列,避免耗时任务阻塞队列
// 什么时候需要调用 msg.Finish() 这个方法?
1. 需支持高并发(并发消费进程concurrency大于1),实时性要求高
2. 耗时较长的任务,nsq默认一分钟超时重发

msg.Requeue(time.Second * 5) // 重新放入队列消费

msg.DisableAutoResponse()  // 禁用自动提交
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
    "github.com/nsqio/go-nsq"
    "log"
    "strconv"
    "time"
)

var (
    NSQPublishAddress = "127.0.0.1:4150" // nsqd
    NSQConsumers      = "127.0.0.1:4150" // nsqlookupd,本地只能直连nsqd,无法连接nsqlookupd(4161)(原因看上方)
    NSQMaxInFlight    = 10               // nsqd 最大连接数 允许的最大的处理中的消息数
    topic             = "test"
    channel           = "1"
)

func main() {
    go startConsumer()
    startProducer()
}

// 生产者
func startProducer() {
    cfg := nsq.NewConfig()

    producer, err := nsq.NewProducer(NSQPublishAddress, cfg)
    if err != nil {
        log.Fatal(err)
    }

    // 发布消息
    var i uint64 = 1
    for {
        if err := producer.Publish(topic, []byte("test message: "+strconv.FormatUint(i, 10))); err != nil {
            log.Fatal("publish error:" + err.Error())
        }

        time.Sleep(time.Second)
        i++
    }
}

// 消费者
func startConsumer(concurrency int) {
    conf := nsq.NewConfig()
    conf.LookupdPollInterval = 1 * time.Second
    conf.MaxInFlight = NSQMaxInFlight
    if concurrency > conf.MaxInFlight {
        conf.MaxInFlight = concurrency
    }

    consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        log.Fatal(err)
    }

    // concurrency: 最大并发消费进程,Consumer支持并发,调用AddConcurrentHandlers()方法指定创建多个handlerLoop协程进行处理即可。
    consumer.AddConcurrentHandlers(nsq.HandlerFunc(handle), concurrency)

    // 设置消息处理函数
    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        message.Finish() // 手动提交消费完成,移出队列
        log.Println("消费信息:" + string(message.Body))
        return nil
    }))

    // 连接
    if err := consumer.ConnectToNSQD(NSQConsumers); err != nil {
    // if err := consumer.ConnectToNSQLookupd(NSQConsumers); err != nil {
        log.Fatal(err)
    }

    <-consumer.StopChan
}

好文推荐

参考文章

nsq 官网

go-nsq 文档