NSQ-分布式实时消息处理平台
文章目录
NSQ 简介
NSQ 是一个基于 Go 语言的分布式实时消息处理平台,它基于 MIT 开源协议发布,由 bitly 公司开源出来的一款简单易用的消息中间件。其设计的目的是用来大规模地处理每天数以十亿计级别的消息。
NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。
基本组件
nsq 有三个必要的组件:nsqd、nsqlookupd、nsqadmin。
nsqd
一个负责接收、排队、转发消息到客户端的守护进程。负责message的收发和队列的维护。 nsqd默认监听一个TCP端口(4150)和一个HTTP端口 (4151)。
nsqd功能特性:
- topic发布到channel中的消息会被消费者消费
- 只要channel存在,即使没有消费者,生产者的message也会缓存在队列中
- topic发布的message至少会被消费一次,nsq正常退出,消息会被写入磁盘
- 限制内存中用,channel中的message是存放在内存中的,可以限制其大小,超出时可以将 message缓存到磁盘
- topic和channel一旦创建就会一直存在,除非主动删除
nsqd 可以多机器部署,当客户端向指定 topic 发送消息时,可以配置多个 nsqd 地址,消息会随机的分配到各个 nsqd 上,nsqd 优先把消息存储到内存 channel 中,当内存 channel 满了之后,则把消息写到磁盘文件中。
指标Connections: 等于 nsqd 数量 * 监听 topic 服务数量
nsqlookupd
中心管理服务进程。nsqlookupd提供管理 nsqd 节点服务,客户端查询生产者、topic和channel服务。 使用TCP(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。
- 一个 nsqd服务只能注册到一个nsqlookupd
- 去中心化,nsqlookupd崩溃也不会影响到nsqd
- 可以充当nsqd和nsqdadmin之间的交互中间件
- 消费者可以通过nsqlookupd获取生产者信息
nsqadmin
一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务。默认访问端口是4171。
- 查看和管理topic和channel
- 查看message数量和状态
nsqadmin 访问地址: http://0.0.0.0:4171/
参数解读:
- depth: 表示在这个channel中未被消费的消息总数
核心概念
message『类似数据库中的记录』
数据流形式,生产者需要消费者处理的一个数据流(包)。
消息,生产者与消费者之间传递的数据,在 nsq 中统一称为 message。
topic『类似数据库中的表』
消息的逻辑关键词。
- message属于某个特定的topic
- 在由生产者在发布消息时生成
- 通常描述消息的内容
channel
生产者与消费者之间的消息通道,相当于消息队列。
- 一个topic可以有多个channel
- 一个channel只能对应一个topic
- channel由消费者首次订阅产生,一个channel可以有多个消费者,起负载均衡的作用。
- 一个topic的消息会复制到与topic关联的各个channel
- channel的名称通常描述的是消费者的业务
消费者在消费 message 的时候,需要指定 topic 和 channel。channel 会在消费者第一次订阅相应 topic 的时候就创建。
channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。
- 同一 topic、同一 channel 下的多个消费者,不会消费到同一 message。可以理解为多个消费者之间进行了负载均衡。
- 同一 topic、不同 channel 下的两个消费者,可以消费到一模一样的 message。 多个消费者之间相互独立,完全没有影响。
Message、Topic、Channel
nsq topic、channel、和消费客户端的结构如下图(一个通道通常可以连接多个客户端。 假设所有连接的客户端都处于准备接收消息的状态,则每条消息都将传递给随机客户端)
消息的生命周期
生产者发布一个消息之后,通过以下途径保证消息可靠性:
- client连接上nsqd,并通过RDY参数告知可以处理的message数量
- nsqd 把消息暂存起来或者推送到各个连接的channels,对于状态为REQ 和 timeout的都会被暂存起来
- 客户端消费消息之后,回复nsqd一个FIN表示消息成功处理,或者REQ表示表示这个消息将会继续重新加入到队列中, 如果处理时间超过配置的timeout时间,则nsqd会把这个消息当成超时处理
- 如果客户端一直没有回复直到超时,则这个消息会被当成超时消息重新加入到队列中
NSQ 运行流程
-
NSQ 推荐通过 nsqd 实例使用协同定位 producer,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个 consumer 读取。更重要的是, producer 不必去发现其他的 nsqd 节点,他们总是可以向本地 nsqd 实例发布消息。
-
一个 producer 向它的本地 nsqd 发送消息,要做到这点,首先要先打开一个连接( NSQ 提供 HTTP API 和 TCP 客户端 等2种方式连接到 nsqd),然后发送一个包含 topic 和消息主体的发布命令(pub/mpub/publish),在这种情况下,我们将消息发布到 topic 上,消息会采用「多播」的方式被拷贝到各个 channel 中, 然后通过多个 channel 以分散到我们不同需求的 consumer 中。
-
channel 起到队列的作用。 多个 producer 产生的 topic 消息在每一个连接 topic 的 channel 上进行排队。
-
每个 channel 的消息都会进行排队,直到一个 consumer 把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。 nsqd节点首先会向 nsqlookup 广播他们的位置信息,一旦它们注册成功, consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic 的 nsqd 节点。
-
每个 consumer 向每个 nsqd 主机进行订阅操作,用于表明 consumer 已经准备好接受消息了。这里我们不需要一个完整的连通图,但我们必须要保证每个单独的 nsqd 实例拥有足够的消费者去消费它们的消息,否则 channel 会被队列堆着。
Quick Start
|
|
常见案例
-
负载均衡消费
核心点:将 channel 设置为固定值(如 1),生产者往一个 topic 发消息时,只有一个消费者能消费消息。
-
通过 channel/topic 实现广播、发布订阅功能
核心点:将 channel 设置为服务器本地ip,生产者往一个 topic 发消息时,topic 下多个 channel 会同时消费同一个消息。
-
客户端对消息的处理和响应
在服务端发送消息给客户端后,如果在处理业务逻辑时,如果发生错误则给服务器发送Requeue命令告诉服务器,重新发送消息进处理。如果处理成功,则发送Finish命令。
服务端收到命令后,对飞翔中的消息进行处理,如果成功则去掉,如果是Requeue则执行归队和重发操作,或者进行defer队列处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { message, ok := <-r.incomingMessages if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) // 重试 } continue } if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } }
-
Timeout
每一条消息都必须在一定时间内向 nsq 做出响应,否则 nsq 会认为这条消息超时,然后 requeue 处理。
配置项 -msg-timeout :单条消息的超时时间,
默认一分钟
,即消息投递后一分钟内未收到响应,则 nsq 会将这条消息 requeue 处理。配置值 -max-msg-timeout :nsqd 全局设置的最大超时时间,
默认 15 分钟
。超时的判定时长将取决于以上两个配置的最小值。
常见报错
-
(12f351b237d1:4150) error connecting to nsqd - dial tcp: lookup 12f351b237d1: no such host
(有2种消费者的写法,第一种是直连nsqd(tcp长连接),第二种是通过nsqlookupd的http接口查询后长连接到nsqd, 显然第二种更易于分布式容错和高可用。)
本地开发环境是用 docker 部署的,nsqd和nsqlookupd是两个容器,consumer.ConnectToNSQLookupd()方法内部也会调用consumer.ConnectToNSQD()方法,
docker 容器间是用 container_id 互相通讯,故需要配置 nsqd 的 container_id 到 /etc/hosts,就能解决此问题。
1 2
# /etc/hosts 示例 127.0.0.1 12f351b237d1
-
error connecting to nsqd - dial tcp: i/o timeout
TODO 原因未知
代码示例
|
|
|
|