更新下载, 暂时取消动态分配线程

This commit is contained in:
konica
2018-02-22 16:28:41 +08:00
parent 638d3e5dd8
commit 18e2f32018
5 changed files with 51 additions and 37 deletions

1
.gitignore vendored
View File

@@ -9,6 +9,7 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
cmd/downloader/downloader
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/

View File

@@ -53,7 +53,7 @@ func (b *Block) setDone() {
// 避免操作 Begin 部分, 否则可能写文件时, 会出现异常
begin := atomic.LoadInt64(&b.Begin)
if b.Begin == 0 {
if begin == 0 {
atomic.StoreInt64(&b.End, 0)
return
}
@@ -123,9 +123,11 @@ for_2: // code 为 1 时, 不重试
switch code {
case 1: // 不重试
break for_2
case 2: // 休息 3 秒, 再无限重试
time.Sleep(3 * time.Second)
case 61: // 不休息无限重试
continue
default: // 休息 3 秒, 再无限重试
default:
time.Sleep(3 * time.Second)
}
@@ -145,12 +147,10 @@ func (der *Downloader) execBlock(id int) (code int, err error) {
}
// 设置 Range 请求头, 给各线程分配内容
header := map[string]string{
"Range": fmt.Sprintf("bytes=%d-%d", block.Begin, block.End),
}
// 开始 http 请求
resp, err := der.config.Client.Req("GET", der.url, nil, header)
resp, err := der.config.Client.Req("GET", der.url, nil, map[string]string{
"Range": fmt.Sprintf("bytes=%d-%d", atomic.LoadInt64(&block.Begin), atomic.LoadInt64(&block.End)),
})
if err != nil {
return 2, err
}
@@ -190,7 +190,7 @@ func (der *Downloader) execBlock(id int) (code int, err error) {
)
for {
begin := block.Begin // 用于下文比较
begin := atomic.LoadInt64(&block.Begin) // 用于下文比较
n, err = resp.Body.Read(block.buf)
@@ -226,7 +226,7 @@ func (der *Downloader) execBlock(id int) (code int, err error) {
// 两次 begin 不相等, 可能已有新的空闲线程参与
// 旧线程应该被结束
if begin != block.Begin {
if begin != atomic.LoadInt64(&block.Begin) {
return 1, errors.New("thread already reload")
}
@@ -237,7 +237,7 @@ func (der *Downloader) execBlock(id int) (code int, err error) {
if err != nil {
// 下载数据可能出现异常, 重新下载
if !block.isDone() {
return 2, fmt.Errorf("download failed, %s, reset", err)
return 61, fmt.Errorf("download failed, %s, reset", err)
}
switch {

View File

@@ -3,17 +3,20 @@ package main
import (
"flag"
"fmt"
"github.com/iikira/BaiduPCS-Go/requester"
"github.com/iikira/downloader"
)
var (
parallel int
testing bool
ua string
)
func init() {
flag.IntVar(&parallel, "p", 5, "download max parallel")
flag.BoolVar(&testing, "t", false, "test mode")
flag.StringVar(&ua, "ua", "", "user-agent")
flag.Parse()
}
@@ -23,8 +26,11 @@ func main() {
return
}
client := requester.NewHTTPClient()
client.UserAgent = ua
for k := range flag.Args() {
downloader.DoDownload(flag.Arg(k), &downloader.Config{
Client: client,
Parallel: parallel,
Testing: testing,
})

View File

@@ -57,6 +57,13 @@ func NewDownloader(durl string, cfg *Config) (der *Downloader, err error) {
return nil, fmt.Errorf(resp.Status)
}
// 设置新的url, 如果网页存在跳转
if resp.Request != nil {
if resp.Request.URL != nil {
durl = resp.Request.URL.String()
}
}
der = &Downloader{
url: durl,
config: cfg,

View File

@@ -70,38 +70,38 @@ func (der *Downloader) blockMonitor() <-chan struct{} {
mu.Unlock() // 解锁
der.addExecBlock(index) // 添加任务
}(k)
/*
// 动态分配新线程
go func(k int) {
mu.Lock()
// 动态分配新线程
go func(k int) {
mu.Lock()
// 筛选空闲的线程
index, ok := der.status.BlockList.avaliableThread()
if !ok { // 没有空的
mu.Unlock() // 解锁
return
}
// 筛选空闲的线程
index, ok := der.status.BlockList.avaliableThread()
if !ok { // 没有空的
mu.Unlock() // 解锁
return
}
middle := (der.status.BlockList[k].Begin + der.status.BlockList[k].End) / 2
if der.status.BlockList[k].End-middle <= 102400 { // 如果线程剩余的下载量太少, 不分配空闲线程
mu.Unlock()
return
}
// 折半
der.status.BlockList[index].Begin = middle + 1
der.status.BlockList[index].End = der.status.BlockList[k].End
der.status.BlockList[index].IsFinal = der.status.BlockList[k].IsFinal
der.status.BlockList[k].End = middle
// End 已变, 取消 Final
der.status.BlockList[k].IsFinal = false
middle := (der.status.BlockList[k].Begin + der.status.BlockList[k].End) / 2
if der.status.BlockList[k].End-middle <= 102400 { // 如果线程剩余的下载量太少, 不分配空闲线程
mu.Unlock()
return
}
// 折半
der.status.BlockList[index].Begin = middle + 1
der.status.BlockList[index].End = der.status.BlockList[k].End
der.status.BlockList[index].IsFinal = der.status.BlockList[k].IsFinal
der.status.BlockList[k].End = middle
// End 已变, 取消 Final
der.status.BlockList[k].IsFinal = false
mu.Unlock()
der.addExecBlock(index)
}(k)
der.addExecBlock(index)
}(k)
*/
}(k)
}
time.Sleep(1 * time.Second) // 监测频率 1 秒