这是基于golang Socket 一个轻量级,支持高并发操作的开发框架chitchat。本文将介绍chitchat的基本使用方法;通过源码分析该框架的具体工作流程;简要讲解作者留下的Demo文件和该框架的使用技巧;下载链接。通过该框架,我们可以方便建立起Server-Client长连接并通信。
使用chitchat
chitchat得以支持高并发连接的关键在于其能够快速响应客户端发起的 链接 并及时开启goroutine确保一对一的通信。对于使用者而言,只需负责向框架注册正确的IP socket(ipAddr:ipPort)(注:除非特别说明,否则后续提到的地址Addr均指addr:port)并正确编写用于处理接收数据和异常处理函数即可正常运行。
开启一个Server
仅需创建一个Server实例并调用其Listen()方法即可使一个Server开始正常工作。一个Server通常只用于监听一个端口,负责一类事物的调度处理。我们看一下具体调度的API:
func NewServer( ipaddrsocket string, delim byte, readfunc ServerReadFunc, additional interface{}) Server
可以看到,创建一个Server实例需要提供四个参数,分别为监听对象,分隔符,处理函数,附加数据。其中附加数据可置为空 ( nil ) 。
监听对象即可供Client连接的IPsocket;当Server读到一连串数据后,将通过delim分隔符将数据切片并交予readfunc处理,多片数据将调用多次readfunc。delim可置为0,此时Server将持续读到 EOF 后才会交付数据。当delim置为‘\n’时,Server会默认换行交付,此时会根据Windows‘\r\n’作出对应调整;处理函数将处理Server交付的数据流;附加数据是为了配合readfunc更好的完成对数据的处理。后续在讲解 如何编写readfunc 时会提及如何使用additional给出的数据。
Server实则是一个可供调用的对外API接口interface,其中包含Listen()方法启动该Server开始监听。
func (t *server) Listen() error
Listen是一个异步方法,如果发现配置参数有误或端口被占用等错误将会直接返回,否则就在后台拉起新的goroutine处理具体事务。Listen()方法不阻塞进程,也不会等到后台goroutine全部正常工作后再返回。
后台goroutine在运转处理的过程中若遇到错误将通过Err channel告知使用者,因此使用者需要 显式 地接收并处理error。注意即使不需要这些error信息,我们也需要有一个接收的过程,否则会导致后台进程堵塞。通过ErrChan()获取该Channel:
type Errsocket struct { Err error RemoteAddr string } func (t *server) ErrChan() <-chan Errsocket
发送的错误消息包含两部分,error和对端ip(addr:port)。
当我们想关闭该Server,只需调用其Cut函数:
func (t *server) Cut() error
Cut()方法会使Server停止监听Socket,同时释放所有已连接的Connection。该方法和Listen()一样,也 不会 等待所有Connection全部关闭后再返回。倘若希望关闭某特定的Connection(当然在我们已经知道该Connection对端连接IPaddr的前提下),我们可以使用CloseRemote方法:
func (t *server) CloseRemote(remoteAddr string) error
至此较为重要的Server API已经简单介绍完成了,另外有些较为简单的API根据名字便可知道其作用,不再简单赘述。之后我们会通过一个简单的例子演示这些API的用法。
type Server interface { Listen() error Cut() error CloseRemote(string) error RangeRemoteAddr() []string GetLocalAddr() string SetDeadLine(time.Duration, time.Duration) ErrChan() <-chan Errsocket Write(interface{}) error }
开启一个Client
通过NewClient函数创建一个Client实例,并通过调用其API方法向服务端发起连接。
func NewClient( ipremotesocket string, delim byte, readfunc ClientReadFunc, additional interface{}) Client {
可以发现,创建Client实例的函数参数与创建Server实例NewServer的函参形式和意义基本相同。再次便不再多加解释。注意的是,v1.0.0版本Client还未能指定自己的ipaddr,只能连接成功后随机分配;另外,readfunc对于Server而言是不可置为空(nil)的,但对于Client而言可以置为nil,即忽略所有Server发送的消息。再有,对于一对Server/Client而言,其分隔符delim应该约定好是相同的,否则可能会出现消息切分错误的情况。
Client通过调用Dial()方法向Server发起连接。
func (t *client) Dial() error
若连接错误,则会返回具体错误原因,否则拉起相应goroutine执行后续操作并返回。
关闭连接可使用API提供的 Close ()命令。
func (t *client) Close()
该函数的作用仅仅是向Client发送了退出的信号,若此时还有业务处于运行状态(如readfunc)则会等待业务正常关闭后再退出。
以下是Client的全部对外API:
type Client interface {
Dial() error
Close()
SetDeadLine(time.Duration)
ErrChan() <-chan Err socket
Write(interface{}) error
GetRemoteAddr() string
GetLocalAddr() string
}
最后我们讲解Write()方法。Write函数将传递的类型通过json编码为[]byte并发送。因此我们可以在readfunc中使用Unmarshal()解码。同时,框架提供了一个函数使得使用者可以自定义Write()方法:
type wf func(net.Conn, interface{}, byte) error func SetWriteFunc(f wf)
readfunc与APIs:
该框架最为重要的核心部分即readfunc的编写,它的作用是处理由Server/Client递交的数据片。我们先看一下readfunc的函数签名:
type ClientReadFunc func([]byte, ReadFuncer) error type ServerReadFunc func([]byte, ReadFuncer) error
无论是Client的readfunc或Server的readfunc,其函数签名都是相同的。ReadFuncer是一个接口interface,它提供了一系列在readfunc函数中可用的API。稍后我们会对其中部分方法进行讲解。
type ReadFuncer interface { GetRemoteAddr() string GetLocalAddr() string GetConn() net.Conn Close() Write(interface{}) error Addon() interface{} }
由于socket只允许传递[]byte类型的数据,因此我们要做的第一步就是将[]byte类型转变为我们希望的数据类型。如果写入的是一个数据类型,我们想从[]byte转为struct可使用:
var t = *(**yourStruct)(unsafe.Pointer(&str))
这里将yourStruct替换为你自己的结构体别名即可。若是string类型,则简单使用类型转换即可。
readfunc提供了够用的API,包括获得本地/远程IP socket与Conn,发送数据,关闭连接,获得附加数据。还记得附加数据吗,这是我们在最初创建Server/Client实例时传入的一个参数,现在可以通过Addon()将其取出来使用了。一般建议传入的是一个指针类型的Addon,这样readfunc可对其进行修改。
关于Close()函数:不用担心在readfunc中使用Close()方法会提前终止readfunc业务,导致数据无法正常交付。正如前文所言,Close()只是向框架传递一个关闭的信号。框架会等待readfunc全部执行完毕后再关闭这个连接。
源码分析
在分析Demo之前,我们先简单探究一下约600多行的源码,看一下其内部各goroutine的支配运行情况。
Chitchat – goroutine
当 Server 调用Listen()方法时,Server内部会拉起一个hL goroutine(handleListen);当成功响应Client的Dial方法时,hL拉起新的goroutinehC4s(handleConnforServer);hC4s通过拉起read读取DATA并负责将DATA交付给Readfunc。一个hC4s对应一个连接,多个连接将开启多个hC4s和read。 Client 向服务端发起连接成功后也将拉起一个goroutinehC4c(handleConnforClient)和read。 eD 是一个较为特殊的goroutine,他负责用户监听的errChannel是否处于关闭状态并将goroutine产生的错误数据传递给用户。
当 Server 调用Cut()方法关闭监听后,它将关闭hL与所有的hC4s和read,以及负责错误转发的eD,同时关闭errChannel;调用Close()/CloseRemote(…)方法时,仅关闭当前连接对应的hC4s和read,不关闭errChannel; Client 调用Close()关闭连接后,将关闭开启的所有goroutine和errChannel。
hC4s和hC4c大同小异,我们着重分析一下hC4s源码:
func handleConnServer(h *hConnerServer, eC chan Errsocket, ctx context.Context, s *server) {...} type hConnerServer struct { conn net.Conn d byte mu *sync.Mutex readfunc ServerReadFunc }
hConnerServer结构体主要包含了以下内容:连接实例 conn ,分隔符 d ,普通锁 mu , readfunc ,其中mutex主要用于维护eD的正常工作;eC是上游传递下来的错误发送通道;监听ctx.Done()保证与上游一起收到退出信号,但不保证退出的顺序;server提供readfunc使用的API。defer()语句保证了当hC4s退出时,将安全关闭conn和eD goroutine。
拉起的readgoroutine将读到的DATA分片通过channel发送给hC4s,hC4s将该DATA交给readfunc处理:
//hC4s
case strReq, ok := <-strReqChan: //read a data slice successfully
if !ok {
return //EOF && d!=0
}
err := h.readfunc(strReq, &server{
currentConn: h.conn,
delimiter: h.d,
remoteMap: s.remoteMap,
additional: s.additional,
})
if err != nil {
h.mu. Lock ()
eC <- Errsocket{err, h.conn.RemoteAddr().String()}
}
}
一个server struct同时实现了Server interface和ReadFuncer interface中的所有方法,并通过接口的方式将特定的方法暴露给框架的使用者,这样设计使一些重复的方法在代码上得到复用。
hC4c在这段代码上稍有不同:
//hC4c case strReq, ok := <-strReqChan: //read a data slice successfully if !ok { return //EOF && d!=0 } if h.readfunc != nil { h.rcmu.Lock() err := h.readfunc(strReq, client) if err != nil { h.eD.mu.Lock() eC <- Errsocket{err, h.conn.RemoteAddr().String()} } h.rcmu.Unlock() } }
区别在于:
- 对于Client而言,其readfunc是可为nil的,这样正常读数据但不会被处理;
- 与Server相比,多了一个rcmu的锁。该锁是防止在readfunc中调用了Close()方法后error Channel被提前关闭,导致readfunc的错误信息无法被正确送达。我们可以看一下Client的Close()方法:
func (t *client) Close() { go func() { t.rcmu.Lock() t.mu.Lock() t.closed = true close(t.eU) t.mu.Unlock() t.rcmu.Unlock() t.cancelfunc() }() }
可以看到,Close()方法会等待rcmu锁被释放后再执行后续操作。而为什么Server不需要为之加锁呢。因为Server在readfunc调用的Close()方法不会关闭上游的error Channel。
Server通过并发安全的map存储每个Conn对应的ip socket与cancelFunc,保证能够独立关闭任意Conn。
Demo分析
与上文一样,首先将各goroutine运作与调度的流程关系通过图的形式表现出来,并简要解释各goroutine的作用:
Master 提供的Listen()方法将注册一个名为 registerNode 的readfunc;当 Node 节点向Master注册成功后,Node节点拉起一个 dHBL (daemon-HeartBeatListener) goroutine,在7939端口发起监听并注册 hb4node readfunc,用于接收ping报文并发送pong回应;Master会拉起一个 dHBC (daemon-HeartBeatChecker),定时向Node端发起连接并发送ping报文,并注册 hb4master readfunc,当成功接收到pong报文后主动关闭连接。若在接收报文消息过程中出现错误,将发送错误消息至 HBC/L error 错误处理器,供作进一步处理。
当 dHBC 连续接收到三次以上错误消息后,判定对端Node失去连接;当 HBL error 十秒以上未收到Master发来的消息后,判定Master已丢失自己。
Demo Tricks
在hb4master/node readfunc中,无论结果成功与否,都会发送一个error(”succeed”或 具体错误),这样在HBC/L error便可根据error得知此次消息传递的结果,并作进一步操作。
Github
Github:chitchat
或者也可以通过
go get github.com/ovenvan/chitchat
下载并使用。