七叶笔记 » golang编程 » NSQ使用GoLang分发消息传递

NSQ使用GoLang分发消息传递

关于如何在Golang中使用NSQ与生产者和消费者的一个小例子。

目前,随着微服务的爆炸式增长,使用消息队列以异步方式通信服务是很常见的。作为 RabbitMQ 或Kafka的大玩家已经通过互联网获得了很多教程,但最近我开始研究Go并且我想尝试一些消息队列,只是为了进行一些测试然后我找到了NSQ并且它自己在其网站上引入: “NSQ是一个实时分布式消息传递平台”

NSQ有两个主要组件,“NSQLOOKUPD”和“NSQD”

  • NSQD 是接收,排队和向客户端传递消息的守护程序。
  • NSQLOOKUPD 是管理拓扑信息的守护程序。客户端查询nsqlookupd以发现特定主题的nsqd生成器,nsqd节点广播主题和通道信息。

还有一个名为“ NSQADMIN ” 的第三个组件,它是管理页面的UI。

对于我们的第一个测试,我们需要一个 nsqd 和一个 nsqlookupd 运行

对于我们的第一个测试,我们需要一个 nsqd 和一个 nsqlookupd 运行,

用docker 运行:

# docker-compose.yml
---
version: '2'
services:
 nsqlookupd:
 image: nsqio/nsq
 command: >
 /nsqlookupd
 -broadcast-address localhost:4160
 ports:
 - "4160:4160"
 - "4161:4161"
 nsqd:
 image: nsqio/nsq
 command: >
 /nsqd
 -broadcast-address localhost
 -lookupd-tcp-address nsqlookupd:4160
 ports:
 - "4150:4150"
 - "4151:4151"
 

我们可以使用以下命:

docker-compose up -d
 

使用NSQD和NSQLOOKUPD运行,让我们在Go中编写一个生产者和一个使用者,我将使用“ nsq-event-bus ”包,它是“ go-nsq包的一个小包装”,生产者代码:

	package main
	
	import (
		"github.com/rafaeljesus/nsq-event-bus"
		"log"
	)
	
	type event struct{ Body string }
	
	func main() {
		topic := "events"
	 
		emitter, err := bus.NewEmitter(bus.EmitterConfig{})
		
		if err !=  nil  {
			log.Fatal("[ERRO]", err)
		}
		
		 Message  := "[Emitter 1] sending message"
		e := event{message}
		
		if err = emitter.Emit(topic, &e); err != nil {
			log.Println("error while was emitting message", err)
		}
		
		log.Println("[Message emitted]", message)
	}
 

消费者代码:

package main
	
	import (
		"github.com/rafaeljesus/nsq-event-bus"
		"log"
		"sync"
	)
	
	type event struct{ Body string }
	
	var wg sync.WaitGroup
	
	func main() {
		wg.Add(1) // just to test purposes, the program will await for one message
	
		if err := bus.On(bus.ListenerConfig{
			Lookup: []string{" localhost :4161"},
			Topic: "events",
			Channel: "consumer1",
			HandlerFunc: handler,
		}); err != nil {
			// handle failure to listen a message
			log.Println("Error while consuming message", err)
		}
	 
		wg.Wait()
	}
	
	func handler(message *bus.Message) (reply interface{}, err error) {
		e := event{}
		if err = message.DecodePayload(&e); err != nil {
			// handle failure to decode a message
			log.Println("Error while consuming message", err)
			message.Finish()
			wg.Done()
			return
		}
		
		log.Println("[Consumer 1] Consuming message", e)
		message.Finish()
		wg.Done()
		return
	}
 

请注意,在消费者代码中,我使用了WaitGroup,因为在查找响应到达之前可能需要一段时间。要看到两个工作运行emitter.go或consumer.go(请记住,消费者将阻止终端,直到收到一条消息,然后你需要在其他终端运行发射器)

$ go run emitter.go 
2017/03/01 01:46:31 INF 1 (localhost:4150) connecting to nsqd
2017/03/01 01:46:31 [Message emitted] [Emitter 1] sending message
$ go run consumer.go 
2017/03/01 01:46:25 INF 1 [events/consumer1] querying nsqlookupd 
2017/03/01 01:46:25 INF 1 [events/consumer1] (localhost:4150) connecting to nsqd
2017/03/01 01:46:44 [Consumer 1] Consuming message {[Emitter 1] sending message}
 

相关文章