nsq 最初是由 bitly 公司开源出来的一款简单易用的 分布式 消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。

nsq
它具有以下特性:
1. 首先到官方文档看用法:

quick_start
下载对应的二进制可执行文件,在本地按照上述步骤就可以跑起来了,看下nsqadmin 后台展示如下:

nsqadmin
2. docker 环境搭建 nsq
参考官方提供资料创建:docker-compose.yml
version: '2' # 高版本支持3
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports :
- "4160:4160" # tcp
- "4161:4161" # http
nsqd:
image: nsqio/nsq
# 广播地址不填的话默认就是oshostname(或虚拟机名称),那样 lookupd 会连接不上,所以直接写IP
command: /nsqd --broadcast-address=10.236.92.208 --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150" # tcp
- "4151:4151" # http
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171" # http
执行 docker-compose up -d 生成对应的三个容器:
nsqgo_nsqd_1nsqgo_nsqlookupd_1nsqgo_nsqadmin_1
3. Golang 使用 nsq
创建生产者:producer.go
package main import ( "fmt" "log" "time" " github .com/nsqio/go-nsq" ) func main() { config := nsq.NewConfig() p, err := nsq.NewProducer("127.0.0.1:4150", config) if err != nil { log.Panic(err) } for i := 0; i < 1000; i++ { msg := fmt.Sprintf("num-%d", i) log.Println("Pub:" + msg) err = p.Publish("testTopic", []byte(msg)) if err != nil { log.Panic(err) } time.Sleep(time.Second * 1) } p.Stop() }
循环写 1000 个 num-1–1000,通过 p.Publish 发送到消息队列中,等待消费。
创建消费者:consumer.go
package main import ( "log" "sync" "github.com/nsqio/go-nsq" ) func main() { wg := &sync.WaitGroup{} wg.Add(1000) config := nsq.NewConfig() c, _ := nsq.NewConsumer("testTopic", "ch", config) c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Printf("Got a message: %s", message.Body) wg.Done() return nil })) // 1.直连nsqd // err := c.ConnectToNSQD("127.0.0.1:4150") // 2.通过 nsqlookupd 服务发现 err := c.ConnectToNSQLookupd("127.0.0.1:4161") if err != nil { log.Panic(err) } wg.Wait() }
可通过两种方式与 nsqd 连接:
1). 直连 nsqd,适用于单机(standalone)版;
2). 通过 nsqlookupd 服务发现,适用于集群(cluster)版;
输出结果:
go run producer.go
2019/08/18 20:29:51 Pub:num-0 2019/08/18 20:29:51 INF 1 (127.0.0.1:4150) connecting to nsqd 2019/08/18 20:29:52 Pub:num-1 2019/08/18 20:29:53 Pub:num-2 2019/08/18 20:29:54 Pub:num-3 2019/08/18 20:29:55 Pub:num-4 2019/08/18 20:29:56 Pub:num-5 2019/08/18 20:29:57 Pub:num-6 2019/08/18 20:29:58 Pub:num-7 2019/08/18 20:29:59 Pub:num-8 2019/08/18 20:30:00 Pub:num-9 2019/08/18 20:30:01 Pub:num-10
go run consumer.go
2019/08/18 20:30:08 INF 1 [testTopic/ch] querying nsqlookupd 2019/08/18 20:30:08 INF 1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd 2019/08/18 20:30:08 Got a message: num-0 2019/08/18 20:30:08 Got a message: num-1 2019/08/18 20:30:08 Got a message: num-2 2019/08/18 20:30:08 Got a message: num-3 2019/08/18 20:30:08 Got a message: num-4 2019/08/18 20:30:08 Got a message: num-5 2019/08/18 20:30:08 Got a message: num-6 2019/08/18 20:30:08 Got a message: num-7 2019/08/18 20:30:08 Got a message: num-8 2019/08/18 20:30:08 Got a message: num-9 2019/08/18 20:30:08 Got a message: num-10
github 源码下载:
【小结】
a. 单个nsqd可以有多个topic,每个topic可以有多个channel。channel接收这个topic所有消息的副本,从而实现多播分发,而channel上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡;
b. nsq 专门为分布式、集群化而生,在处理 SPOF(single point of failure, 单点故障)、高可用、最终一致性方面很有优势。

稻草人生