七叶笔记 » golang编程 » 教程:用golang从零开始手写一个bt下载客户端(7)

教程:用golang从零开始手写一个bt下载客户端(7)

背景

我们现在有了下载一个torrent的所有工具:一个包含peers的列表,可以和其他peer进行tcp通信,初始化一个握手协议,发送和接收消息。但是我们还面临的最大的问题就是如何处理和peers交互的 并发 问题,并且管理peers交互的 状态 问题,这也是比较典型的两个老大难的问题,通常使用队列解决此类问题。

管理并发

在go语言中,我们通过通信共享内存,而不是通过共享内存通信,我们可以考虑使用channel作为轻量级队列。

我们将设置两个队列来同步并发的任务,一个用来从peers下载分片内容,一个用来收集下载后的结果,最后把结果放入一个缓冲区,组装成一个完整文件。

 // 初始化队列
workQueue := make(chan *pieceWork, len(t.PieceHashs))
	results := make(chan *pieceResult, len(t.PieceHashs))

	for index, hash := range t.PieceHashs {
		length := t.calculatePieceSize(index)
		workQueue <- &pieceWork{index, hash, length}
	}
// 开启下载任务
	for _, peer := range t.Peers {
		go t.startDownloadWorker(peer, workQueue, results)
	}

// 收集结果	
buf := make([] byte , t.Length)

	donePieces := 0
	for donePieces < len(t.PieceHashs) {
		res := <-results
		begin, end := t.calculateBoundsForPiece(res.index)
		copy(buf[begin:end], res.buf)
		donePieces++
		 percent  := float64(donePieces) / float64(len(t.PieceHashs)) * 100
		numWorkers := runtime.NumGoroutine() - 1
		 log .Printf("(%0.2f%%) Downloaded pieces #%d from %d peers\n", percent, res.index, numWorkers)
	}
	close(workQueue)  
 func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
    c, err := client.New(peer, t.PeerID, t.InfoHash)
    if err != nil {
        log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
        return
    }
    defer c.Conn.Close()
    log.Printf("Completed handshake with %s\n", peer.IP)

    c.SendUnchoke()
    c.SendInterested()

    for pw := range workQueue {
        if !c.Bitfield.HasPiece(pw.index) {
            workQueue <- pw // 没有的放回队列
            continue
        }

        // 下载分片
        buf, err := attemptDownloadPiece(c, pw)
        if err != nil {
            log.Println("Exiting", err)
            workQueue <- pw // 失败的放回队列
            return
        }

        err = checkIntegrity(pw, buf)
        if err != nil {
            log.Printf("Piece #%d failed integrity check\n", pw.index)
            workQueue <- pw // 校验不通过放回队列
            continue
        }

        c.SendHave(pw.index)
        results <- &pieceResult{pw.index, buf}
    }
}  

我们对每个peer建立了一个下载任务,任务从队列workQueue里面获取任务信息并执行,把执行结果放入results队列,下载之前先发送释放阻塞的请求,下载完对比分片hash是否相同,完成后把结果放入results队列中,并告诉peer我们已经有这个分片了。每当results队列中有结果的时候就把它取出来放入buffer。

状态管理

我们使用一个 结构体 来跟踪peer的状态,读取消息后改变结构体状态信息,它包括我们从peer下载了多少内容,请求了多少次,是否被阻塞了等。

 type pieceProgress struct {
    index      int
    client     *client.Client
    buf        []byte
    downloaded int
    requested  int
    backlog    int
}

func (state *pieceProgress) read message () error {
     msg , err := state.client.Read() // this call blocks
    switch msg.ID {
    case message.MsgUnchoke:
        state.client.Choked = false
    case message.MsgChoke:
        state.client.Choked = true
    case message.MsgHave:
        index, err := message.ParseHave(msg)
        state.client.Bitfield.SetPiece(index)
    case message.MsgPiece:
        n, err := message.ParsePiece(state.index, state.buf, msg)
        state.downloaded += n
        state.backlog--
    }
    return nil
}  

发起请求

文件,分片,hash这些还不算完,我们需要把分片再分块,一个分块为分片内的索引index,offset,length,我们向peer请求数据实际上是请求块,每个块大小通常是16KB,一个256KB大小的分片实际上要请求16次peer。一个peer在收到超过16KB的数据时应该断开连接,但根据经验达到128KB也是完全能接受的,适当的增大块能提升速度,只要遵守规范。

流水线

网络往返代价是昂贵的,一个块接一个块顺序下载是会显著降低下载性能的,因此把请求流水线化是非常重要的,它能保持持续的请求压力,使得吞吐量数量级的提升。

典型地,BitTorrent客户端会使用队列化的5条流水线请求,我发现提高这个数量会加倍提速,可以选择合适的大小适应现代网络速度,这个需要不断调优才能优化性能。

 const MaxBlockSize = 16384

const MaxBacklog = 5

func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
    state := pieceProgress{
        index:  pw.index,
        client: c,
        buf:    make([]byte, pw.length),
    }

    // 设置超时防止卡死,30秒足够下载256KB
    c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
    defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline

    for state.downloaded < pw.length {
        // 如果没有阻塞就发送请求
        if !state.client.Choked {
            for state.backlog < MaxBacklog && state.requested < pw.length {
                blockSize := MaxBlockSize
                // 最后一个块大小可能比较小
                if pw.length-state.requested < blockSize {
                    blockSize = pw.length - state.requested
                }

                err := c.SendRequest(pw.index, state.requested, blockSize)
                if err != nil {
                    return nil, err
                }
                state.backlog++
                state.requested += blockSize
            }
        }

        err := state.readMessage()
        if err != nil {
            return nil, err
        }
    }

    return state.buf, nil
}  

好,到目前为止,我们已经完成了整个客户端,只需要在main.go中把这些组织起来,然后运行它!

下一篇我们将分享源代码和一些参考文献,敬请关注。

相关文章