关于如何在Golang中使用NSQ与生产者和消费者的一个小例子。
目前,随着微服务的爆炸式增长,使用消息队列以异步方式通信服务是很常见的。作为 RabbitMQ 或Kafka的大玩家已经通过互联网获得了很多教程,但最近我开始研究Go并且我想尝试一些消息队列,只是为了进行一些测试然后我找到了NSQ并且它自己在其网站上引入: “NSQ是一个实时分布式消息传递平台”
NSQ有两个主要组件,“NSQLOOKUPD”和“NSQD”
- NSQD 是接收,排队和向客户端传递消息的守护程序。
- NSQLOOKUPD 是管理拓扑信息的守护程序。客户端查询nsqlookupd以发现特定主题的nsqd生成器,nsqd节点广播主题和通道信息。
还有一个名为“ NSQADMIN ” 的第三个组件,它是管理页面的UI。
对于我们的第一个测试,我们需要一个 nsqd 和一个 nsqlookupd 运行
对于我们的第一个测试,我们需要一个 nsqd 和一个 nsqlookupd 运行,
用docker 运行:
# docker-compose.yml --- version: '2' services: nsqlookupd: image: nsqio/nsq command: > /nsqlookupd -broadcast-address localhost:4160 ports: - "4160:4160" - "4161:4161" nsqd: image: nsqio/nsq command: > /nsqd -broadcast-address localhost -lookupd-tcp-address nsqlookupd:4160 ports: - "4150:4150" - "4151:4151"
我们可以使用以下命:
docker-compose up -d
使用NSQD和NSQLOOKUPD运行,让我们在Go中编写一个生产者和一个使用者,我将使用“ nsq-event-bus ”包,它是“ go-nsq包的一个小包装”,生产者代码:
package main import ( "github.com/rafaeljesus/nsq-event-bus" "log" ) type event struct{ Body string } func main() { topic := "events" emitter, err := bus.NewEmitter(bus.EmitterConfig{}) if err != nil { log.Fatal("[ERRO]", err) } Message := "[Emitter 1] sending message" e := event{message} if err = emitter.Emit(topic, &e); err != nil { log.Println("error while was emitting message", err) } log.Println("[Message emitted]", message) }
消费者代码:
package main
import (
"github.com/rafaeljesus/nsq-event-bus"
"log"
"sync"
)
type event struct{ Body string }
var wg sync.WaitGroup
func main() {
wg.Add(1) // just to test purposes, the program will await for one message
if err := bus.On(bus.ListenerConfig{
Lookup: []string{" localhost :4161"},
Topic: "events",
Channel: "consumer1",
HandlerFunc: handler,
}); err != nil {
// handle failure to listen a message
log.Println("Error while consuming message", err)
}
wg.Wait()
}
func handler(message *bus.Message) (reply interface{}, err error) {
e := event{}
if err = message.DecodePayload(&e); err != nil {
// handle failure to decode a message
log.Println("Error while consuming message", err)
message.Finish()
wg.Done()
return
}
log.Println("[Consumer 1] Consuming message", e)
message.Finish()
wg.Done()
return
}
请注意,在消费者代码中,我使用了WaitGroup,因为在查找响应到达之前可能需要一段时间。要看到两个工作运行emitter.go或consumer.go(请记住,消费者将阻止终端,直到收到一条消息,然后你需要在其他终端运行发射器)
$ go run emitter.go 2017/03/01 01:46:31 INF 1 (localhost:4150) connecting to nsqd 2017/03/01 01:46:31 [Message emitted] [Emitter 1] sending message $ go run consumer.go 2017/03/01 01:46:25 INF 1 [events/consumer1] querying nsqlookupd 2017/03/01 01:46:25 INF 1 [events/consumer1] (localhost:4150) connecting to nsqd 2017/03/01 01:46:44 [Consumer 1] Consuming message {[Emitter 1] sending message}