七叶笔记 » golang编程 » Golang 轻量级-高并发socket框架——chitchat

Golang 轻量级-高并发socket框架——chitchat

这是基于golang Socket 一个轻量级,支持高并发操作的开发框架chitchat。本文将介绍chitchat的基本使用方法;通过源码分析该框架的具体工作流程;简要讲解作者留下的Demo文件和该框架的使用技巧;下载链接。通过该框架,我们可以方便建立起Server-Client长连接并通信。

使用chitchat

chitchat得以支持高并发连接的关键在于其能够快速响应客户端发起的 链接 并及时开启goroutine确保一对一的通信。对于使用者而言,只需负责向框架注册正确的IP socket(ipAddr:ipPort)(注:除非特别说明,否则后续提到的地址Addr均指addr:port)并正确编写用于处理接收数据和异常处理函数即可正常运行。

开启一个Server

仅需创建一个Server实例并调用其Listen()方法即可使一个Server开始正常工作。一个Server通常只用于监听一个端口,负责一类事物的调度处理。我们看一下具体调度的API:

func NewServer(
 ipaddrsocket string,
 delim byte, 
 readfunc ServerReadFunc, 
 additional interface{}) Server
 

可以看到,创建一个Server实例需要提供四个参数,分别为监听对象,分隔符,处理函数,附加数据。其中附加数据可置为空 ( nil )

监听对象即可供Client连接的IPsocket;当Server读到一连串数据后,将通过delim分隔符将数据切片并交予readfunc处理,多片数据将调用多次readfunc。delim可置为0,此时Server将持续读到 EOF 后才会交付数据。当delim置为‘\n’时,Server会默认换行交付,此时会根据Windows‘\r\n’作出对应调整;处理函数将处理Server交付的数据流;附加数据是为了配合readfunc更好的完成对数据的处理。后续在讲解 如何编写readfunc 时会提及如何使用additional给出的数据。

Server实则是一个可供调用的对外API接口interface,其中包含Listen()方法启动该Server开始监听。

func (t *server) Listen() error
 

Listen是一个异步方法,如果发现配置参数有误或端口被占用等错误将会直接返回,否则就在后台拉起新的goroutine处理具体事务。Listen()方法不阻塞进程,也不会等到后台goroutine全部正常工作后再返回。

后台goroutine在运转处理的过程中若遇到错误将通过Err channel告知使用者,因此使用者需要 显式 地接收并处理error。注意即使不需要这些error信息,我们也需要有一个接收的过程,否则会导致后台进程堵塞。通过ErrChan()获取该Channel:

type Errsocket struct {
 Err error
 RemoteAddr string
}
func (t *server) ErrChan() <-chan Errsocket
 

发送的错误消息包含两部分,error和对端ip(addr:port)。

当我们想关闭该Server,只需调用其Cut函数:

func (t *server) Cut() error
 

Cut()方法会使Server停止监听Socket,同时释放所有已连接的Connection。该方法和Listen()一样,也 不会 等待所有Connection全部关闭后再返回。倘若希望关闭某特定的Connection(当然在我们已经知道该Connection对端连接IPaddr的前提下),我们可以使用CloseRemote方法:

func (t *server) CloseRemote(remoteAddr string) error
 

至此较为重要的Server API已经简单介绍完成了,另外有些较为简单的API根据名字便可知道其作用,不再简单赘述。之后我们会通过一个简单的例子演示这些API的用法。

type Server interface {
 Listen() error
 Cut() error
 CloseRemote(string) error
 RangeRemoteAddr() []string
 GetLocalAddr() string
 SetDeadLine(time.Duration, time.Duration)
 ErrChan() <-chan Errsocket
 Write(interface{}) error
}
 

开启一个Client

通过NewClient函数创建一个Client实例,并通过调用其API方法向服务端发起连接。

func NewClient(
 ipremotesocket string,
 delim byte, 
 readfunc ClientReadFunc, 
 additional interface{}) Client {
 

可以发现,创建Client实例的函数参数与创建Server实例NewServer的函参形式和意义基本相同。再次便不再多加解释。注意的是,v1.0.0版本Client还未能指定自己的ipaddr,只能连接成功后随机分配;另外,readfunc对于Server而言是不可置为空(nil)的,但对于Client而言可以置为nil,即忽略所有Server发送的消息。再有,对于一对Server/Client而言,其分隔符delim应该约定好是相同的,否则可能会出现消息切分错误的情况。

Client通过调用Dial()方法向Server发起连接。

func (t *client) Dial() error 
 

若连接错误,则会返回具体错误原因,否则拉起相应goroutine执行后续操作并返回。

关闭连接可使用API提供的 Close ()命令。

func (t *client) Close()
 

该函数的作用仅仅是向Client发送了退出的信号,若此时还有业务处于运行状态(如readfunc)则会等待业务正常关闭后再退出。

以下是Client的全部对外API:

type Client interface {
 Dial() error
 Close()
 SetDeadLine(time.Duration)
 ErrChan() <-chan Err socket 
 Write(interface{}) error
 GetRemoteAddr() string
 GetLocalAddr() string
}
 

最后我们讲解Write()方法。Write函数将传递的类型通过json编码为[]byte并发送。因此我们可以在readfunc中使用Unmarshal()解码。同时,框架提供了一个函数使得使用者可以自定义Write()方法:

type wf func(net.Conn, interface{}, byte) error
func SetWriteFunc(f wf)
 

readfunc与APIs:

该框架最为重要的核心部分即readfunc的编写,它的作用是处理由Server/Client递交的数据片。我们先看一下readfunc的函数签名:

type ClientReadFunc func([]byte, ReadFuncer) error
type ServerReadFunc func([]byte, ReadFuncer) error
 

无论是Client的readfunc或Server的readfunc,其函数签名都是相同的。ReadFuncer是一个接口interface,它提供了一系列在readfunc函数中可用的API。稍后我们会对其中部分方法进行讲解。

type ReadFuncer interface {
 GetRemoteAddr() string
 GetLocalAddr() string
 GetConn() net.Conn
 Close()
 Write(interface{}) error
 Addon() interface{}
}
 

由于socket只允许传递[]byte类型的数据,因此我们要做的第一步就是将[]byte类型转变为我们希望的数据类型。如果写入的是一个数据类型,我们想从[]byte转为struct可使用:

var t = *(**yourStruct)(unsafe.Pointer(&str))
 

这里将yourStruct替换为你自己的结构体别名即可。若是string类型,则简单使用类型转换即可。

readfunc提供了够用的API,包括获得本地/远程IP socket与Conn,发送数据,关闭连接,获得附加数据。还记得附加数据吗,这是我们在最初创建Server/Client实例时传入的一个参数,现在可以通过Addon()将其取出来使用了。一般建议传入的是一个指针类型的Addon,这样readfunc可对其进行修改。

关于Close()函数:不用担心在readfunc中使用Close()方法会提前终止readfunc业务,导致数据无法正常交付。正如前文所言,Close()只是向框架传递一个关闭的信号。框架会等待readfunc全部执行完毕后再关闭这个连接。

源码分析

在分析Demo之前,我们先简单探究一下约600多行的源码,看一下其内部各goroutine的支配运行情况。

Chitchat – goroutine

Server 调用Listen()方法时,Server内部会拉起一个hL goroutine(handleListen);当成功响应Client的Dial方法时,hL拉起新的goroutinehC4s(handleConnforServer);hC4s通过拉起read读取DATA并负责将DATA交付给Readfunc。一个hC4s对应一个连接,多个连接将开启多个hC4s和read。 Client 向服务端发起连接成功后也将拉起一个goroutinehC4c(handleConnforClient)和read。 eD 是一个较为特殊的goroutine,他负责用户监听的errChannel是否处于关闭状态并将goroutine产生的错误数据传递给用户。

Server 调用Cut()方法关闭监听后,它将关闭hL与所有的hC4s和read,以及负责错误转发的eD,同时关闭errChannel;调用Close()/CloseRemote(…)方法时,仅关闭当前连接对应的hC4s和read,不关闭errChannel; Client 调用Close()关闭连接后,将关闭开启的所有goroutine和errChannel。

hC4s和hC4c大同小异,我们着重分析一下hC4s源码:

func handleConnServer(h *hConnerServer, eC chan Errsocket, ctx context.Context, s *server) {...}
type hConnerServer struct {
 conn net.Conn
 d byte
 mu *sync.Mutex
 readfunc ServerReadFunc
}
 

hConnerServer结构体主要包含了以下内容:连接实例 conn ,分隔符 d ,普通锁 mu readfunc ,其中mutex主要用于维护eD的正常工作;eC是上游传递下来的错误发送通道;监听ctx.Done()保证与上游一起收到退出信号,但不保证退出的顺序;server提供readfunc使用的API。defer()语句保证了当hC4s退出时,将安全关闭conn和eD goroutine。

拉起的readgoroutine将读到的DATA分片通过channel发送给hC4s,hC4s将该DATA交给readfunc处理:

//hC4s
 case strReq, ok := <-strReqChan: //read a data slice successfully
 if !ok {
 return //EOF && d!=0
 }
 err := h.readfunc(strReq, &server{
 currentConn: h.conn,
 delimiter: h.d,
 remoteMap: s.remoteMap,
 additional: s.additional,
 })
 if err != nil {
 h.mu. Lock ()
 eC <- Errsocket{err, h.conn.RemoteAddr().String()}
 }
 }
 

一个server struct同时实现了Server interface和ReadFuncer interface中的所有方法,并通过接口的方式将特定的方法暴露给框架的使用者,这样设计使一些重复的方法在代码上得到复用。

hC4c在这段代码上稍有不同:

//hC4c
 case strReq, ok := <-strReqChan: //read a data slice successfully
 if !ok {
 return //EOF && d!=0
 }
 if h.readfunc != nil {
 h.rcmu.Lock()
 err := h.readfunc(strReq, client)
 if err != nil {
 h.eD.mu.Lock()
 eC <- Errsocket{err, h.conn.RemoteAddr().String()}
 }
 h.rcmu.Unlock()
 }
 }
 

区别在于:

  1. 对于Client而言,其readfunc是可为nil的,这样正常读数据但不会被处理;
  2. 与Server相比,多了一个rcmu的锁。该锁是防止在readfunc中调用了Close()方法后error Channel被提前关闭,导致readfunc的错误信息无法被正确送达。我们可以看一下Client的Close()方法:
func (t *client) Close() {
 go func() {
 t.rcmu.Lock()
 t.mu.Lock()
 t.closed = true
 close(t.eU)
 t.mu.Unlock()
 t.rcmu.Unlock()
 t.cancelfunc()
 }()
}
 

可以看到,Close()方法会等待rcmu锁被释放后再执行后续操作。而为什么Server不需要为之加锁呢。因为Server在readfunc调用的Close()方法不会关闭上游的error Channel。

Server通过并发安全的map存储每个Conn对应的ip socket与cancelFunc,保证能够独立关闭任意Conn。

Demo分析

与上文一样,首先将各goroutine运作与调度的流程关系通过图的形式表现出来,并简要解释各goroutine的作用:

Master 提供的Listen()方法将注册一个名为 registerNode 的readfunc;当 Node 节点向Master注册成功后,Node节点拉起一个 dHBL (daemon-HeartBeatListener) goroutine,在7939端口发起监听并注册 hb4node readfunc,用于接收ping报文并发送pong回应;Master会拉起一个 dHBC (daemon-HeartBeatChecker),定时向Node端发起连接并发送ping报文,并注册 hb4master readfunc,当成功接收到pong报文后主动关闭连接。若在接收报文消息过程中出现错误,将发送错误消息至 HBC/L error 错误处理器,供作进一步处理。

dHBC 连续接收到三次以上错误消息后,判定对端Node失去连接;当 HBL error 十秒以上未收到Master发来的消息后,判定Master已丢失自己。

Demo Tricks

在hb4master/node readfunc中,无论结果成功与否,都会发送一个error(”succeed”或 具体错误),这样在HBC/L error便可根据error得知此次消息传递的结果,并作进一步操作。

Github

Github:chitchat

或者也可以通过

go get github.com/ovenvan/chitchat
 

下载并使用。

相关文章