NSQ 架构

NSQ 简介

NSQ 是一个基于 Go 语言的分布式实时消息处理平台,它基于 MIT 开源协议发布,由 bitly 公司开源出来的一款简单易用的消息中间件。其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

基本组件

nsq 有三个必要的组件:nsqd、nsqlookupd、nsqadmin。

nsqd

负责接收消息,存储队列和将消息发送给客户端。

nsqd 可以多机器部署,当客户端向指定 topic 发送消息时,可以配置多个 nsqd 地址,消息会随机的分配到各个 nsqd 上,nsqd 优先把消息存储到内存 channel 中,当内存 channel 满了之后,则把消息写到磁盘文件中。

nsqlookupd

主要负责管理拓扑信息、nsqd 的心跳、状态监测,给客户端、nsqadmin 提供 topic 所在的 nsqd 地址与状态。

nsqadmin

nsqadmin 是一个 web 管理界面,用来汇集集群的实时统计,并执行不同的管理任务。

topic

topic 是 NSQ 消息发布的『逻辑关键词』,可以理解为人为定义的一种消息类型。当程序初次发布带 topic 的消息时,如果 topic 不存在,则会在 nsqd 中创建。

生产者和 Topic 是一对一的关系。生产者不需要指定 channel,一个消息直接发布到所有消费者创建的 channel 中。

channel

消费者在消费 message 的时候,需要指定 topic 和 channel。一个 topic 下可以有多个 channel。channel 会在消费者第一次订阅相应 topic 的时候就创建。

channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。

  • 同一 topic、同一 channel 下的多个消费者,不会消费到同一 message。可以理解为多个消费者之间进行了负载均衡。
  • 同一 topic、不同 channel 下的两个消费者,可以消费到一模一样的 message。 多个消费者之间相互独立,完全没有影响。

message

消息,生产者与消费者之间传递的数据,在 nsq 中统一称为 message。

Topic、Channel、Message

一个 Topic -> 多个 Channel -> 多个 Message

常用工具类:

  • nsq_to _file:消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。
  • nsq_to _http:消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。
  • nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。

NSQ 运行流程

  1. NSQ 推荐通过 nsqd 实例使用协同定位 producer,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个 consumer 读取。更重要的是, producer 不必去发现其他的 nsqd 节点,他们总是可以向本地 nsqd 实例发布消息。

  2. 一个 producer 向它的本地 nsqd 发送消息,要做到这点,首先要先打开一个连接( NSQ 提供 HTTP API 和 TCP 客户端 等2种方式连接到 nsqd),然后发送一个包含 topic 和消息主体的发布命令(pub/mpub/publish),在这种情况下,我们将消息发布到 topic 上,消息会采用「多播」的方式被拷贝到各个 channel 中, 然后通过多个 channel 以分散到我们不同需求的 consumer 中。

  3. channel 起到队列的作用。 多个 producer 产生的 topic 消息在每一个连接 topic 的 channel 上进行排队。

  4. 每个 channel 的消息都会进行排队,直到一个 consumer 把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。 nsqd节点首先会向 nsqlookup 广播他们的位置信息,一旦它们注册成功, consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic 的 nsqd 节点。

  5. 每个 consumer 向每个 nsqd 主机进行订阅操作,用于表明 consumer 已经准备好接受消息了。这里我们不需要一个完整的连通图,但我们必须要保证每个单独的 nsqd 实例拥有足够的消费者去消费它们的消息,否则 channel 会被队列堆着。

Quick Start

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
brew install nsq

# 1. nsqlookupd 启动
nsqlookupd

# 2. nsqd 启动
nsqd --lookupd-tcp-address=127.0.0.1:4160

# 3. nsqadmin 启动
nsqadmin --lookupd-http-address=127.0.0.1:4161

# 4. 发布初始化消息
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'

# 5. 客户端启动,连接 channel「nsq_to_file」
nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

# 6. 发送更多消息
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'

# 7. in a web browser open http://127.0.0.1:4171/ to view the nsqadmin UI and see statistics.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 代码示例
package main

import (
    "github.com/nsqio/go-nsq"
    "log"
    "strconv"
    "time"
)

var (
    listenIP = "127.0.0.1:4150"
    topic = "test"
    channel = "1"
)

func main() {
    go startConsumer()
    startProducer()
}

// 生产者
func startProducer() {
    cfg := nsq.NewConfig()

    producer, err := nsq.NewProducer(listenIP, cfg)
    if err != nil {
        log.Fatal(err)
    }

    // 发布消息
    var i uint64 = 1
    for {
        if err := producer.Publish(topic, []byte("test message: " + strconv.FormatUint(i, 10))); err != nil {
            log.Fatal("publish error:" + err.Error())
        }

        time.Sleep(time.Second)
        i++
    }
}

// 消费者
func startConsumer() {
    cfg := nsq.NewConfig()

    consumer, err := nsq.NewConsumer(topic, channel, cfg)
    if err != nil {
        log.Fatal(err)
    }

    // 设置消息处理函数
    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Println("消费信息:" + string(message.Body))
        return nil
    }))

    // 连接到单例nsqd
    if err := consumer.ConnectToNSQD(listenIP); err != nil {
        log.Fatal(err)
    }

    <-consumer.StopChan
}

参考文章

nsq 官网 go-nsq 文档