以往看网上的源码分析时,基本都是前面一段讲解,后面跟一大段代码,没有上下文分析,我就暗暗的想,如果一个函数或一段逻辑能有中文注释(俺小本毕业英语不太好)带有上下文分析,这样读源码岂不是会更快顺畅。。。不废话了,我们开始吧。
上一篇大概讲解了基本介绍,我们也把Nsqd一步一步跑起来了(假设您已动手尝试过),本篇则从源码入口开始讲解
前言
- 针对特殊的包或者方法,会单独开一篇博客讲解,请注意代码里面的链接地址,建议手动尝试一下
- 文章宗旨是学习大神的每一行代码,所以看起来会比较啰嗦,建议您一边看代码一边读文章(效果更佳)
NSQ整体流程
NSQ由3个守护进程组成:
- nsqd 是接收、保存和传送消息到客户端的守护进程
- nsqlookupd 是管理的拓扑信息,维护着所有nsqd的状态,并提供了最终一致发现服务的守护进程
- nsqadmin 是一个web ui的实时监控集群和执行各种管理任务
nsqd入口文件:nsq/apps/nsqd/main.go
废话不多说,都在酒里了(代码里),直接看注释就能理解
package main
import (
"flag"
"fmt"
"math/ rand "
"os"
"path/filepath"
"sync"
"syscall"
"time"
// toml开源包
" github .com/BurntSushi/toml"
// go-options开源包
"github.com/mreiferson/go-options"
// 内部版本号
"github.com/nsqio/nsq/internal/version"
// 命令行控制包 svc 服务控制
"github.com/judwhite/go-svc/svc"
// 内部包 日志中间件 log
"github.com/nsqio/nsq/internal/lg"
// nsqd真正工作的区域
"github.com/nsqio/nsq/nsqd"
)
/*
定义业务program结构体
*/type program struct {
// once能确保 实例化 对象 Do 方法在多线程环境只运行一次,内部通过互斥锁实现
once sync.Once
nsqd *nsqd.NSQD
}
/*
采用SVC包进行服务控制,主要是统一管理服务,对于信号控制不用每次都写在业务上,在ctrl+c时,能正常监听defer结束,方便获取很多日志,参数等
*/func main() {
// 实例化
prg := &program{}
/*
服务控制实践-svc包转/
// Implement this interface and pass it to the Run function to start your program.
type Service interface {
// Init is called before the program/service is started and after it's
// determined if the program is running as a Windows Service.
Init(Environment) error
// Start is called after Init. This method must be non-blocking.
Start() error
// Stop is called in response to os.Interrupt, os.Kill, or when a
// Windows Service is stopped.
Stop() error
}
svc 第一个参数需要实现Service接口才可以正常运行,这也就是大伙看到的program 实现的init/start/stop三个函数
使用svc启动相关程序
*/if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}
func (p *program) Init(env svc.Environment) error {
// 检查是否是windows 服务。。。目测一般时候也用不到
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
/*
实例化并初始一些配置和默认值
/nsq/nsqd/options.go
*/opts := nsqd.NewOptions()
/*
封装了命令行的一些检查项,设置检查项的默认值
使用apps目录:/nsq/apps/nsqd/options.go
然后parse解析
*/flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
// 生成随机数 time.Now().UnixNano() 单位纳秒
rand.Seed(time.Now().UTC().UnixNano())
// 打印版本号,接收命令行参数version 默认值:false
/*
执行效果
bj-m-server:nsqd yixia$ go run ./ --version=true
nsqd v1.2.1-alpha (built w/go1.12.2)
*/if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
// 打印版本号 %!V(string=nsqd v1.2.1-alpha (built w/go1.12.2))
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
// 获取外部的配置文件,解析toml文件格式
var cfg config
/*
bj-m-server:nsqd yixia$ go run ./ --config=config.toml
*/configFile := flagSet.Lookup("config").Value.String()
// 如果不为空
if configFile != "" {
// 加载,读出的数据采用空_ 抛弃,赋值给cfg
_, err := toml.DecodeFile(configFile, &cfg)
// 抛错
if err != nil {
logFatal("failed to load config file %s - %s", configFile, err)
}
}
// 检查配置文件
cfg.Validate()
// 采用优先级从高到低依次进行解析,最终
options.Resolve(opts, flagSet, cfg)
/*
传入用户自定义配置,实例化nsqd
nsqd.new以后做了那些事情,大概捋一下,后续看的时候能加深印象
1、检查命令行cli opts.DataPath 、 opts.Logger没设置 设置默认值
2、实例NSQD主对象
3、监听tcp net.Listen("tcp", opts.TCPAddress)
4、监听http net.Listen("tcp", opts.HTTPAddress)
5、监听https tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)
综合以上了解,基本做的事情就是实例化主对象,并对cli 自定义的命令一顿操作。。。然后就这样了,return (*NSQD, error)
*/nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd
/*
加载历史数据,数据来源nsqd.dat -> 历史数据格式{"topics":[],"version":"1.2.1-alpha"}
1、获取历史数据
2、解析成对应的结构体 json.Unmarshal(data, &m)
3、遍历 for _, t := range m.Topics , 解析每个topic -> channel
4、启动N个topic.Start()(重点代码中有一个GetTopic,采用线程线程安全方式,重点学习)
5、func (n *NSQD) LoadMetadata() error
*/err = p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
/*
持久化最新数据
1、获取原始数据文件名 fileName := newMetadataFile(n.getOpts())
2、遍历 nsqd.topicMap -> ndqd.channelMap ,这是对topicMap和channelMap加了互斥锁
3、将最新数据写入到临时文件中,明文明文件名为:nsqd.dat.333569681738193261.tmp
4、func (n *NSQD) PersistMetadata() error
*/err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}
/*
开启协程进入nsqd.Main主函数
Main方法里重点使用了封装的WaitGroup
下列出现的n.waitGroup.Wrap均采用了封装groutine
可以看我另一篇文章讲解了封装的流程和使用方法:积累-waitgroup包装/
方法:
func (n *NSQD) Main() error {
大体执行思路
1、实例化context
2、建立退出通道,保证退出函数只运行一次,创建了匿名函数exitFunc
3、初始化并监听TCPServer
3.1、exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
3.2、TCPServer采用无限循环方式监听tcp client长连接,当有一个client连接,分配一个groutine进行处理
for -> listener.Accept() -> groutine
4、初始化HTTPServer
4.1、使用httprouter进行路由设置,然后初始化各种接口
5、初始化HttpsServer
6、监控循环队列:n.waitGroup.Wrap(n.queueScanLoop)
7、节点信息管理:n.waitGroup.Wrap(n.lookupLoop)
8、统计信息:n.waitGroup.Wrap(n.statsdLoop)
*/go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()
return nil
}
func (p *program) Stop() error {
/*
/*
底层源码
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
可以看成这样的链式操作
p.once.Do == program.once.Do
确保在执行时只执行一次退出操作
*/
p.once.Do(func() {
p.nsqd.Exit()
})
return nil
}
func logFatal(f string, args ...interface{}) {
lg.LogFatal("[nsqd] ", f, args...)
}
Nsqd流程图
偷个懒,从网上摘录的流程图直接拿下来了大家先看看
小结
nsqd代码逻辑清晰,利用Go协程高效并发处理分布式多节点nsqd的生产和消费,学习并发处理nsqd是最佳项目,每行代码都值得学习,坚持读每一行代码相信大伙一定会受益匪浅的,等我们把这项目2000多行代码都读差不多了,在回头看看成长,绝对比看几本书学的快,学以致用,多动手,多练习。
下一章具体分析nsqd主程序。