修改速度计算方法, 新增暂停和恢复下载

This commit is contained in:
apple
2018-03-15 22:42:29 +08:00
parent c475c7d6e2
commit af8695b1f1
4 changed files with 73 additions and 37 deletions

View File

@@ -109,36 +109,38 @@ func (bl *BlockList) isAllDone() bool {
// addExecBlock 增加线程任务
func (der *Downloader) addExecBlock(id int) {
der.status.BlockList[id].running++
for_2: // code 为 1 时, 不重试
// 其他的 code, 无限重试
for {
code, err := der.execBlock(id)
go func(id int) {
der.status.BlockList[id].running++
for_2: // code 为 1 时, 不重试
// 其他的 code, 无限重试
for {
code, err := der.execBlock(id)
// 成功, 退出循环
if code == 0 || err == nil {
break
}
// 下载成功, 或者下载暂停, 退出循环
if code == 0 || err == nil || der.paused {
break
}
// fmt.Println(id, code, err)
// fmt.Println(id, code, err)
// 未成功(有错误), 继续
switch code {
case 1: // 不重试
break for_2
case 2: // 休息 3 秒, 再无限重试
time.Sleep(3 * time.Second)
case 61: // 不休息无限重试
// 未成功(有错误), 继续
switch code {
case 1: // 不重试
break for_2
case 2: // 休息 3 秒, 再无限重试
time.Sleep(3 * time.Second)
case 61: // 不休息无限重试
continue
default:
time.Sleep(3 * time.Second)
}
// 重新下载
continue
default:
time.Sleep(3 * time.Second)
}
// 重新下载
continue
}
der.status.BlockList[id].running--
der.status.BlockList[id].running--
}(id)
}
// downloadBlock 块执行下载任务
@@ -201,6 +203,7 @@ func (der *Downloader) execBlock(id int) (code int, err error) {
n, err = resp.Body.Read(block.buf)
n64 = int64(n)
atomic.AddInt64(&der.status.Speeds, n64)
// 获得剩余的数据量
expectedSize := block.expectedContentLength()

View File

@@ -15,6 +15,8 @@ import (
type Downloader struct {
OnExecute func()
OnFinish func()
OnPause func()
OnResume func()
url string
config *Config
@@ -23,6 +25,7 @@ type Downloader struct {
sinceTime time.Time
status Status
paused bool
}
// NewDownloader 创建新的文件下载
@@ -184,11 +187,9 @@ func (der *Downloader) Execute() (err error) {
}
} else {
for id := range der.status.BlockList {
go func(id int) {
// 分配缓存空间
der.status.BlockList[id].buf = make([]byte, der.config.CacheSize)
der.addExecBlock(id)
}(id)
// 分配缓存空间
der.status.BlockList[id].buf = make([]byte, der.config.CacheSize)
der.addExecBlock(id)
}
// 开启监控
@@ -204,6 +205,36 @@ func (der *Downloader) Execute() (err error) {
return err
}
// Pause 暂停下载, 不支持单线程暂停下载
func (der *Downloader) Pause() {
defer trigger(der.OnPause)
if der.paused { // 已经暂停, 退出
return
}
der.paused = true
for _, block := range der.status.BlockList {
if block != nil && block.resp != nil && !block.resp.Close {
block.resp.Body.Close()
}
}
}
// Resume 恢复下载, 不支持单线程
func (der *Downloader) Resume() {
defer trigger(der.OnResume)
if !der.paused { // 未被暂停, 退出
return
}
der.paused = false
for id := range der.status.BlockList {
// 分配缓存空间
der.status.BlockList[id].buf = make([]byte, der.config.CacheSize)
der.addExecBlock(id)
}
}
func (der *Downloader) singleDownload() error {
resp, err := der.config.Client.Req("GET", der.url, nil, nil)
if resp != nil {

View File

@@ -18,6 +18,12 @@ func (der *Downloader) blockMonitor() <-chan struct{} {
c := make(chan struct{})
go func() {
for {
// 下载暂停, 不开启监控
if der.paused {
time.Sleep(2 * time.Second)
continue
}
// 下载完毕, 线程全部完成下载任务, 发送结束信号
if der.status.BlockList.isAllDone() {
c <- struct{}{}

View File

@@ -22,18 +22,13 @@ type Status struct {
}
// GetStatusChan 返回 Status 对象的 channel
func (der *Downloader) GetStatusChan() <-chan *Status {
c := make(chan *Status)
func (der *Downloader) GetStatusChan() <-chan Status {
c := make(chan Status)
go func() {
var old = atomic.LoadInt64(&der.status.Downloaded)
for {
time.Sleep(1 * time.Second) // 每秒统计
atomic.StoreInt64(&der.status.Speeds, atomic.LoadInt64(&der.status.Downloaded)-old)
old = der.status.Downloaded
if speeds := atomic.LoadInt64(&der.status.Speeds); speeds > atomic.LoadInt64(&der.status.MaxSpeeds) {
atomic.StoreInt64(&der.status.MaxSpeeds, speeds)
}
@@ -46,7 +41,8 @@ func (der *Downloader) GetStatusChan() <-chan *Status {
return
}
c <- &der.status
c <- der.status
atomic.StoreInt64(&der.status.Speeds, 0) // 清空速度统计
}
}()