背景
我们现在有了下载一个torrent的所有工具:一个包含peers的列表,可以和其他peer进行tcp通信,初始化一个握手协议,发送和接收消息。但是我们还面临的最大的问题就是如何处理和peers交互的 并发 问题,并且管理peers交互的 状态 问题,这也是比较典型的两个老大难的问题,通常使用队列解决此类问题。
管理并发
在go语言中,我们通过通信共享内存,而不是通过共享内存通信,我们可以考虑使用channel作为轻量级队列。
我们将设置两个队列来同步并发的任务,一个用来从peers下载分片内容,一个用来收集下载后的结果,最后把结果放入一个缓冲区,组装成一个完整文件。
// 初始化队列
workQueue := make(chan *pieceWork, len(t.PieceHashs))
results := make(chan *pieceResult, len(t.PieceHashs))
for index, hash := range t.PieceHashs {
length := t.calculatePieceSize(index)
workQueue <- &pieceWork{index, hash, length}
}
// 开启下载任务
for _, peer := range t.Peers {
go t.startDownloadWorker(peer, workQueue, results)
}
// 收集结果
buf := make([] byte , t.Length)
donePieces := 0
for donePieces < len(t.PieceHashs) {
res := <-results
begin, end := t.calculateBoundsForPiece(res.index)
copy(buf[begin:end], res.buf)
donePieces++
percent := float64(donePieces) / float64(len(t.PieceHashs)) * 100
numWorkers := runtime.NumGoroutine() - 1
log .Printf("(%0.2f%%) Downloaded pieces #%d from %d peers\n", percent, res.index, numWorkers)
}
close(workQueue)
func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil {
log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
return
}
defer c.Conn.Close()
log.Printf("Completed handshake with %s\n", peer.IP)
c.SendUnchoke()
c.SendInterested()
for pw := range workQueue {
if !c.Bitfield.HasPiece(pw.index) {
workQueue <- pw // 没有的放回队列
continue
}
// 下载分片
buf, err := attemptDownloadPiece(c, pw)
if err != nil {
log.Println("Exiting", err)
workQueue <- pw // 失败的放回队列
return
}
err = checkIntegrity(pw, buf)
if err != nil {
log.Printf("Piece #%d failed integrity check\n", pw.index)
workQueue <- pw // 校验不通过放回队列
continue
}
c.SendHave(pw.index)
results <- &pieceResult{pw.index, buf}
}
}
我们对每个peer建立了一个下载任务,任务从队列workQueue里面获取任务信息并执行,把执行结果放入results队列,下载之前先发送释放阻塞的请求,下载完对比分片hash是否相同,完成后把结果放入results队列中,并告诉peer我们已经有这个分片了。每当results队列中有结果的时候就把它取出来放入buffer。
状态管理
我们使用一个 结构体 来跟踪peer的状态,读取消息后改变结构体状态信息,它包括我们从peer下载了多少内容,请求了多少次,是否被阻塞了等。
type pieceProgress struct {
index int
client *client.Client
buf []byte
downloaded int
requested int
backlog int
}
func (state *pieceProgress) read message () error {
msg , err := state.client.Read() // this call blocks
switch msg.ID {
case message.MsgUnchoke:
state.client.Choked = false
case message.MsgChoke:
state.client.Choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
state.client.Bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
state.downloaded += n
state.backlog--
}
return nil
}
发起请求
文件,分片,hash这些还不算完,我们需要把分片再分块,一个分块为分片内的索引index,offset,length,我们向peer请求数据实际上是请求块,每个块大小通常是16KB,一个256KB大小的分片实际上要请求16次peer。一个peer在收到超过16KB的数据时应该断开连接,但根据经验达到128KB也是完全能接受的,适当的增大块能提升速度,只要遵守规范。
流水线
网络往返代价是昂贵的,一个块接一个块顺序下载是会显著降低下载性能的,因此把请求流水线化是非常重要的,它能保持持续的请求压力,使得吞吐量数量级的提升。
典型地,BitTorrent客户端会使用队列化的5条流水线请求,我发现提高这个数量会加倍提速,可以选择合适的大小适应现代网络速度,这个需要不断调优才能优化性能。
const MaxBlockSize = 16384
const MaxBacklog = 5
func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}
// 设置超时防止卡死,30秒足够下载256KB
c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline
for state.downloaded < pw.length {
// 如果没有阻塞就发送请求
if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
// 最后一个块大小可能比较小
if pw.length-state.requested < blockSize {
blockSize = pw.length - state.requested
}
err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil {
return nil, err
}
state.backlog++
state.requested += blockSize
}
}
err := state.readMessage()
if err != nil {
return nil, err
}
}
return state.buf, nil
}
好,到目前为止,我们已经完成了整个客户端,只需要在main.go中把这些组织起来,然后运行它!
下一篇我们将分享源代码和一些参考文献,敬请关注。