diff --git a/lib/timewheel/timewheel.go b/lib/timewheel/timewheel.go index a5263bd..ba3fe89 100644 --- a/lib/timewheel/timewheel.go +++ b/lib/timewheel/timewheel.go @@ -6,13 +6,18 @@ import ( "time" ) +type location struct { + slot int + etask *list.Element +} + // TimeWheel can execute job after waiting given duration type TimeWheel struct { interval time.Duration ticker *time.Ticker slots []*list.List - timer map[string]int + timer map[string]*location currentPos int slotNum int addTaskChannel chan task @@ -35,7 +40,7 @@ func New(interval time.Duration, slotNum int) *TimeWheel { tw := &TimeWheel{ interval: interval, slots: make([]*list.List, slotNum), - timer: make(map[string]int), + timer: make(map[string]*location), currentPos: 0, slotNum: slotNum, addTaskChannel: make(chan task), @@ -99,12 +104,12 @@ func (tw *TimeWheel) start() { func (tw *TimeWheel) tickHandler() { l := tw.slots[tw.currentPos] - tw.scanAndRunTask(l) if tw.currentPos == tw.slotNum-1 { tw.currentPos = 0 } else { tw.currentPos++ } + go tw.scanAndRunTask(l) } func (tw *TimeWheel) scanAndRunTask(l *list.List) { @@ -138,11 +143,18 @@ func (tw *TimeWheel) addTask(task *task) { pos, circle := tw.getPositionAndCircle(task.delay) task.circle = circle - tw.slots[pos].PushBack(task) - - if task.key != "" { - tw.timer[task.key] = pos + e := tw.slots[pos].PushBack(task) + loc := &location{ + slot: pos, + etask: e, } + if task.key != "" { + _, ok := tw.timer[task.key] + if ok { + tw.removeTask(task.key) + } + } + tw.timer[task.key] = loc } func (tw *TimeWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) { @@ -155,18 +167,11 @@ func (tw *TimeWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) } func (tw *TimeWheel) removeTask(key string) { - position, ok := tw.timer[key] + pos, ok := tw.timer[key] if !ok { return } - l := tw.slots[position] - for e := l.Front(); e != nil; { - task := e.Value.(*task) - if task.key == key { - delete(tw.timer, task.key) - l.Remove(e) - } - - e = e.Next() - } + l := tw.slots[pos.slot] + l.Remove(pos.etask) + delete(tw.timer, key) }