NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。可用于大规模系统中的实时消息服务,并且每天能够处理数亿(十亿)级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。

NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。

基本概念

NSQ 由 3 个守护进程组成:

1、nsqd 是接收、队列和传送消息到客户端的守护进程。

nsqd守护进程是NSQ的核心部分,它是一个单独的监听某个端口进来的消息的二进制程序。每个nsqd节点都独立运行,不共享任何状态。当一个节点启动时,它会同时开启tcp和http服务,两个服务都可以提供给生产者和消费者,向一组nsqlookupd节点进行注册操作,http服务还提供给nsqadmin获取该nsqd本地topic和channel信息;。

客户端可以发布消息到nsqd守护进程上,或者从nsqd守护进程上读取消息。通常,消息发布者会向一个单一的local nsqd发布消息,消费者从连接了的一组nsqd节点的topic上远程读取消息。如果你不关心动态添加节点功能,你可以直接运行standalone模式。

2、nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。

nsqlookupd服务器像consul或etcd那样工作,只是它被设计得没有协调和强一致性能力。每个nsqlookupd都作为nsqd节点注册信息的短暂数据存储区。消费者连接这些节点去检测需要从哪个nsqd节点上读取消息。

nsqlookupd服务同时开启tcp和http两个监听服务,nsqd会作为客户端,连上nsqlookupd的tcp服务,并上报自己的topic和channel信息,以及通过心跳机制判断nsqd状态;还有个http服务提供给nsqadmin获取集群信息;

3、nsqadmin 是一个 Web UI 来实时监控集群和执行各种管理任务。

术语

  • topic消息

    topic 是 NSQ 消息发布的 逻辑关键词 ,可以理解为人为定义的一种消息类型。当程序初次发布带 topic 的消息时,如果 topic 不存在,则会在 nsqd中创建。

  • producer消息的生产者/发布者

    producer 通过 HTTP API 将消息发布到 nsqd 的指定 topic ,一般有 pub/mpub 两种方式, pub 发布一个消息, mpub 一个往返发布多个消息。

    producer 也可以通过 nsqd客户端 的 TCP接口 将消息发布给 nsqd 的指定 topic 。

    当生产者 producer 初次发布带 topic 的消息给 nsqd 时,如果 topic 不存在,则会在 nsqd 中创建 topic 。

    生产者会同时连上NSQ集群中所有nsqd节点,当然这些节点的地址是在Writer初始化时,通过外界传递进去;当发布消息时,writer会随机选择一个nsqd节点发布某个topic的消息;

  • channel消息传递的通道

    当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个 channel 中, channel 起到队列的作用。

    channel 与 consumer(消费者) 相关,是消费者之间的负载均衡,消费者通过这个特殊的channel读取消息。

    在 consumer 想单独获取某个 topic 的消息时,可以 subscribe(订阅)一个自己单独命名的 nsqd中还不存在的 channel, nsqd会为这个 consumer创建其命名的 channel

    Channel 会将消息进行排列,如果没有 consumer读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。可以在配置中配置具体参数。

    一个 channel 一般会有多个 consumer 连接。假设所有已连接的 consumer 处于准备接收消息的状态,每个消息将被传递到一个随机的 consumer。

    Go语言中的channel是表达队列的一种自然方式,因此一个NSQ的topic/channel,其核心就是一个存放消息指针的Go-channel缓冲区。缓冲区的大小由 –mem-queue-size 配置参数确定。

  • consumer消息的消费者

    consumer 通过 TCPsubscribe 自己需要的 channel

    topic 和 channel 都没有预先配置。 topic 由第一次发布消息到命名 topic 的 producer 创建 或 第一次通过 subscribe 订阅一个命名 topic 的 consumer 来创建。 channel 被 consumer 第一次 subscribe 订阅到指定的 channel 创建。

    多个 consumersubscribe一个 channel,假设所有已连接的客户端处于准备接收消息的状态,每个消息将被传递到一个 随机 的 consumer。

    NSQ 支持延时消息, consumer 在配置的延时时间后才能接受相关消息。

    Channel在 consumer 退出后并不会删除,这点需要特别注意。

    消费者也会同时连上NSQ集群中所有nsqd节点,reader首先会连上nsqlookupd,获取集群中topic的所有producer,然后通过tcp连上所有producer节点,并在本地用tornado轮询每个连接,当某个连接有可读事件时,即有消息达到,处理即可;

基本原理

架构

  1. NSQ推荐通过 nsqd 实例使用协同定位 producer,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个 consumer读取。更重要的是, producer不必去发现其他的 nsqd节点,他们总是可以向本地 nsqd实例发布消息。

  2. 一个 producer向它的本地 nsqd发送消息,要做到这点,首先要先打开一个连接( NSQ 提供 HTTP API 和 TCP 客户端 等2种方式连接到 nsqd),然后发送一个包含 topic和消息主体的发布命令(pub/mpub/publish),在这种情况下,我们将消息发布到 topic上,消息会采用多播的方式被拷贝到各个 channel中, 然后通过多个 channel以分散到我们不同需求的 consumer中。

  3. channel起到队列的作用。 多个 producer产生的 topic消息在每一个连接 topic的 channel上进行排队。

  4. 每个 channel的消息都会进行排队,直到一个 consumer把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。 nsqd节点首先会向 nsqlookup 广播他们的位置信息,一旦它们注册成功, consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic的 nsqd节点。

  5. 每个 consumer向每个 nsqd主机进行订阅操作,用于表明 consumer已经准备好接受消息了。这里我们不需要一个完整的连通图,但我们必须要保证每个单独的 nsqd实例拥有足够的消费者去消费它们的消息,否则 channel会被队列堆着。

设计

1、分布式方案

nsqd随意起, nsqlookup使用备份的方式,nsqlookupd的高可用是通过同时运行多个实例, 多个实例之间保持互备实现的。 一个client只会同时对一个nsqd建立连接, 所以一旦一个nsqd连接, 那么就不会对其他的topic建立连接 只有再一个nsqd坏掉的时候,才会重新选择nsqd。

2、高可用

高可用(无单点问题) writer和reader是直接连上各个nsqd节点,因此即使nsqlookupd挂了,也不影响线上正常使用;即使某个nsqd节点挂了,writer发布消息时,发现节点挂了,可以选择其他节点(当然,这是客户端负责的),单个节点挂了对reader无影响;

3、高性能

writer在发布消息时,是随机发布到集群中nsqd节点,因此在一定程序上达到负载均衡;reader同时监听着集群中所有nsqd节点,无论哪个节点有消息,都会投递到reader上;

4、高可扩展

当向集群中添加节点时,首先reader会通过nsqlookupd发现新的节点加入,并自动连接;因为writer连接的nsqd节点的地址是初始化时设置的,因此增加节点时,只需要在初始化writer时,添加新节点的地址即可;

5、client选择nsqd的原则

nsq保证消息能够正常至少传输一次的方式是:

  • client表明已经可以接受消息
  • nsqd将消息发送出去, 同时将这个消息进行本地存储
  • client如果回复FIN 表示成功接受, 如果回复REQ, 表明需要重发, 如果没有回复, 则认为超时了, 进行重发

所以, 当nsqd异常关闭的时候, 没有来得及保存到本地的消息可能会丢失, 解决办法是讲同样的消息发送到两个nsqd中

由于消息至少会被发送一次, 则意味着消息可能会被发送多次, 客户端需要能够确定收到消息所执行的操作是幂等的,即收到一次与收到多次的影响一致

6、保证消息不丢失

nsdlookup 如何路由请求

{"channels": [ "nsq_to_file", "c" ], "producers": [ { "remote_address": "127.0.0.1:58148", "hostname": "safedev01v.add.corp.qihoo.net", "broadcast_address": "safedev01v.add.corp.qihoo.net", "tcp_port": 4150, "http_port": 4151, "version": "0.3.6" }, { "remote_address": "10.16.59.85:39652", "hostname": "safedev02v.add.corp.qihoo.net", "broadcast_address": "safedev02v.add.corp.qihoo.net", "tcp_port": 4150, "http_port": 4151, "version": "0.3.7" } ] }

7、细节

  • 可以设置内存的使用大小, 但是并不建议将内存设置太小, 毕竟持久化是为了保证unclean关闭nsqd时,消息不会丢失

  • nsq-chan 的信息就是保存在go-chan中的,

8、每一个topic包含三个协程:

  • router: 从go-chan中读取新发布的消息,并讲消息保存在一个队列(ram or rom)中,
  • messagePump
  • DiskQueue 讲内存中的消息写入到磁盘,

如果一个topic没有订阅者(客户端),则该topic的内容就不会被diskqueue写入到磁盘中, 而是由DummyBackendQueue直接将消息丢弃掉

9、nsqd中减小GC的优化方案

  • 避免[]byte转换string
  • 重用缓存或者对象
  • 预先分配slice的内存, 并且知道每个item的大小
  • 避免使用interface{} 和封装的类型, >like a struct for a “multiple value” go-chan).
  • 避免使用defer

具体的源码解析在这里

部署使用

1、下载有现成的二进制文件。

2、首先启动 nsdlookupd

nsqlookupd

客户端通过查询 nsdlookupd 来发现指定topic的生产者,并且 nsqd 节点广播 topic 和通道 channel 信息

该服务运行后有两个端口:TCP 接口,nsqd 用它来广播;HTTP 接口,客户端用它来发现和管理。

在生产环境中,为了高可用,最好部署三个nsqlookupd服务。

3、部署nsqd

先创建 nsqd 的数据路径

mkdir /tmp/nsqdata1 /tmp/nsqdata2

运行两个测试的 nsqd 实例

nsqd --lookupd-tcp-address=127.0.0.1:4160 -broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4150 -http-address=0.0.0.0:4151 -data-path=/tmp/nsqdata1
nsqd --lookupd-tcp-address=127.0.0.1:4160 -broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4152 -http-address=0.0.0.0:4153 -data-path=/tmp/nsqdata2

nsqd 可以独立运行,不过通常它是由 nsdlookupd 实例所在集群配置的(它在这能声明 topics 和 channels ,以便大家能找到)

服务启动后有两个端口:一个给客户端(TCP),另一个是 HTTP API。还能够开启HTTPS。

同一台服务器启动多个 nsqd ,要注意端口和数据路径必须不同,包括: –lookupd-tcp-address 、 -tcp-address 、 –data-path

删除 topic 、channel 需要 HTTP API 调用。

4、启动 nsqadmin 前端Web监控

nsqadmin --lookupd-http-address=localhost:4161

nsqadmin 是一套 WEB UI ,用来汇集集群的实时统计,并执行不同的管理任务。

运行后,能够通过4171端口查看并管理 topic 和 channel 。

nsqadmin 通常只需要运行一个。

使用实例

2个Producer 1个Consumer

produce1() 发布publish "x","y" 到 topic "test"
produce2() 发布publish "z" 到 topic "test"
consumer1() 订阅subscribe channel "sensor01" of topic "test"

代码

package test

import (
        "log"
        "time"
        "testing"
        "strconv"

        "github.com/nsqio/go-nsq"
)

func TestNSQ1(t *testing.T) {
       NSQDsAddrs := []string{"127.0.0.1:4150", "127.0.0.1:4152"}
       go consumer1(NSQDsAddrs)
       go produce1()
       go produce2()
       time.Sleep(30 * time.Second)
}

func produce1() {
        cfg := nsq.NewConfig()
        nsqdAddr := "127.0.0.1:4150"
        producer, err := nsq.NewProducer(nsqdAddr, cfg)
        if err != nil {
                log.Fatal(err)
        }
        if err := producer.Publish("test", []byte("x")); err != nil {
                log.Fatal("publish error: " + err.Error())
        }
        if err := producer.Publish("test", []byte("y")); err != nil {
                log.Fatal("publish error: " + err.Error())
        }
}

func produce2() {
        cfg := nsq.NewConfig()
        nsqdAddr := "127.0.0.1:4152"
        producer, err := nsq.NewProducer(nsqdAddr, cfg)
        if err != nil {
                log.Fatal(err)
        }
        if err := producer.Publish("test", []byte("z")); err != nil {
                log.Fatal("publish error: " + err.Error())
        }
}

func consumer1(NSQDsAddrs []string) {
        cfg := nsq.NewConfig()
        consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
        if err != nil {
                log.Fatal(err)
        }
        consumer.AddHandler(nsq.HandlerFunc(
                func(message *nsq.Message) error {
                        log.Println(string(message.Body) + " C1")
                        return nil
                }))
        if err := consumer.ConnectToNSQDs(NSQDsAddrs); err != nil {
                log.Fatal(err, " C1")
        }
        <-consumer.StopChan
}

测试结果

x,y,z 都被 consumer1 接收了。注意到接收时间, x,y 几乎同时被接收,它们都由 producer1 发布,而 z 由 producer2 发布,中间间隔10秒。测试了很多次都是10秒,偶尔是15秒或20秒。查看了ConnectToNSQDs()

// ConnectToNSQDs takes multiple nsqd addresses to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically.  This method is useful when you want to connect to local instance.

Consumer 每隔 x 秒,向 nsqlookud 进行http轮询,用来更新自己的 nsqd 地址目录,当一个 producer 的 channel 一直没有数据时,则会轮询到下一个 producer

可见go的客户端代码库就是github.com/nsqio/go-nsq。

1个Producer 3个Consumer

produce3() 发布publish "x","y","z" 到 topic "test"
consumer1() 订阅subscribe channel "sensor01" of topic "test"
consumer2() 订阅subscribe channel "sensor01" of topic "test"
consumer3() 订阅subscribe channel "sensor02" of topic "test"

代码

package test

import (
        "log"
        "time"
        "testing"
        "strconv"

        "github.com/nsqio/go-nsq"
)

func TestNSQ2(t *testing.T) {
        NSQDsAddrs := []string{"127.0.0.1:4150"}
        go consumer1(NSQDsAddrs)
        go consumer2(NSQDsAddrs)
        go consumer3(NSQDsAddrs)
        go produce3()
        time.Sleep(5 * time.Second)
}

func produce3() {
        cfg := nsq.NewConfig()
        nsqdAddr := "127.0.0.1:4150"
        producer, err := nsq.NewProducer(nsqdAddr, cfg)
        if err != nil {
                log.Fatal(err)
        }
        if err := producer.Publish("test", []byte("x")); err != nil {
                log.Fatal("publish error: " + err.Error())
        }
        if err := producer.Publish("test", []byte("y")); err != nil {
                log.Fatal("publish error: " + err.Error())
        }
        if err := producer.Publish("test", []byte("z")); err != nil {
                log.Fatal("publish error: " + err.Error())
        }
}

func consumer1(NSQDsAddrs []string) {
        cfg := nsq.NewConfig()
        consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
        if err != nil {
                log.Fatal(err)
        }
        consumer.AddHandler(nsq.HandlerFunc(
                func(message *nsq.Message) error {
                        log.Println(string(message.Body) + " C1")
                        return nil
                }))
        if err := consumer.ConnectToNSQDs(NSQDsAddrs); err != nil {
                log.Fatal(err, " C1")
        }
        <-consumer.StopChan
}

func consumer2(NSQDsAddrs []string) {
        cfg := nsq.NewConfig()
        consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
        if err != nil {
                log.Fatal(err)
        }
        consumer.AddHandler(nsq.HandlerFunc(
                func(message *nsq.Message) error {
                        log.Println(string(message.Body) + " C2")
                        return nil
                }))
        if err := consumer.ConnectToNSQDs(NSQDsAddrs); err != nil {
                log.Fatal(err, " C2")
        }
        <-consumer.StopChan
}

func consumer3(NSQDsAddrs []string) {
        cfg := nsq.NewConfig()
        consumer, err := nsq.NewConsumer("test", "sensor02", cfg)
        if err != nil {
                log.Fatal(err)
        }
        consumer.AddHandler(nsq.HandlerFunc(
                func(message *nsq.Message) error {
                        log.Println(string(message.Body) + " C3")
                        return nil
                }))
        if err := consumer.ConnectToNSQDs(NSQDsAddrs); err != nil {
               log.Fatal(err, " C3")
        }
        <-consumer.StopChan
}

测试结果

consumer1 接收到了 y
consumer2 接收到了 x,z
consumer3 接收到了 x,y,z
channelsensor01 中的消息被随机的分到了 consumer1 和 consumer2
consumer3 单独占有 channelsensor02,接收了其中的所有消息

使用细节

  1. nsqd启动时,端口和数据存放要不同

  2. 消息发送必须指定具体的某个nsqd;而消费则可以通过lookupd获取再重定向

  3. 消费者接受数据时,要设置 config.MaxInFlight

  4. channel在消费者退出后并不会删除,需要特别注意。如果紧紧是想利用nsq作为消息广播,不考虑离线数据保存,不妨考虑nats。

  5. channel的名字,有很多限制,基本ASSCI字符+数字,以及点号”.”,下划线”_”。中文(其他非英语文字应该也不行)、以及空格、冒号”:”、横线”-“等都不得出现