mirror of
				https://github.com/HDT3213/godis.git
				synced 2025-10-30 19:46:21 +08:00 
			
		
		
		
	 4fd56dae08
			
		
	
	4fd56dae08
	
	
	
		
			
			* remove task O(1), and none block tickerHandler() * remove task O(1), and none block tickerHandler()
		
			
				
	
	
		
			178 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			178 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package timewheel
 | |
| 
 | |
| import (
 | |
| 	"container/list"
 | |
| 	"github.com/hdt3213/godis/lib/logger"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| type location struct {
 | |
| 	slot  int
 | |
| 	etask *list.Element
 | |
| }
 | |
| 
 | |
| // TimeWheel can execute job after waiting given duration
 | |
| type TimeWheel struct {
 | |
| 	interval time.Duration
 | |
| 	ticker   *time.Ticker
 | |
| 	slots    []*list.List
 | |
| 
 | |
| 	timer             map[string]*location
 | |
| 	currentPos        int
 | |
| 	slotNum           int
 | |
| 	addTaskChannel    chan task
 | |
| 	removeTaskChannel chan string
 | |
| 	stopChannel       chan bool
 | |
| }
 | |
| 
 | |
| type task struct {
 | |
| 	delay  time.Duration
 | |
| 	circle int
 | |
| 	key    string
 | |
| 	job    func()
 | |
| }
 | |
| 
 | |
| // New creates a new time wheel
 | |
| func New(interval time.Duration, slotNum int) *TimeWheel {
 | |
| 	if interval <= 0 || slotNum <= 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	tw := &TimeWheel{
 | |
| 		interval:          interval,
 | |
| 		slots:             make([]*list.List, slotNum),
 | |
| 		timer:             make(map[string]*location),
 | |
| 		currentPos:        0,
 | |
| 		slotNum:           slotNum,
 | |
| 		addTaskChannel:    make(chan task),
 | |
| 		removeTaskChannel: make(chan string),
 | |
| 		stopChannel:       make(chan bool),
 | |
| 	}
 | |
| 	tw.initSlots()
 | |
| 
 | |
| 	return tw
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) initSlots() {
 | |
| 	for i := 0; i < tw.slotNum; i++ {
 | |
| 		tw.slots[i] = list.New()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Start starts ticker for time wheel
 | |
| func (tw *TimeWheel) Start() {
 | |
| 	tw.ticker = time.NewTicker(tw.interval)
 | |
| 	go tw.start()
 | |
| }
 | |
| 
 | |
| // Stop stops the time wheel
 | |
| func (tw *TimeWheel) Stop() {
 | |
| 	tw.stopChannel <- true
 | |
| }
 | |
| 
 | |
| // AddJob add new job into pending queue
 | |
| func (tw *TimeWheel) AddJob(delay time.Duration, key string, job func()) {
 | |
| 	if delay < 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	tw.addTaskChannel <- task{delay: delay, key: key, job: job}
 | |
| }
 | |
| 
 | |
| // RemoveJob add remove job from pending queue
 | |
| // if job is done or not found, then nothing happened
 | |
| func (tw *TimeWheel) RemoveJob(key string) {
 | |
| 	if key == "" {
 | |
| 		return
 | |
| 	}
 | |
| 	tw.removeTaskChannel <- key
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) start() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-tw.ticker.C:
 | |
| 			tw.tickHandler()
 | |
| 		case task := <-tw.addTaskChannel:
 | |
| 			tw.addTask(&task)
 | |
| 		case key := <-tw.removeTaskChannel:
 | |
| 			tw.removeTask(key)
 | |
| 		case <-tw.stopChannel:
 | |
| 			tw.ticker.Stop()
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) tickHandler() {
 | |
| 	l := tw.slots[tw.currentPos]
 | |
| 	if tw.currentPos == tw.slotNum-1 {
 | |
| 		tw.currentPos = 0
 | |
| 	} else {
 | |
| 		tw.currentPos++
 | |
| 	}
 | |
| 	go tw.scanAndRunTask(l)
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) scanAndRunTask(l *list.List) {
 | |
| 	for e := l.Front(); e != nil; {
 | |
| 		task := e.Value.(*task)
 | |
| 		if task.circle > 0 {
 | |
| 			task.circle--
 | |
| 			e = e.Next()
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		go func() {
 | |
| 			defer func() {
 | |
| 				if err := recover(); err != nil {
 | |
| 					logger.Error(err)
 | |
| 				}
 | |
| 			}()
 | |
| 			job := task.job
 | |
| 			job()
 | |
| 		}()
 | |
| 		next := e.Next()
 | |
| 		l.Remove(e)
 | |
| 		if task.key != "" {
 | |
| 			delete(tw.timer, task.key)
 | |
| 		}
 | |
| 		e = next
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) addTask(task *task) {
 | |
| 	pos, circle := tw.getPositionAndCircle(task.delay)
 | |
| 	task.circle = circle
 | |
| 
 | |
| 	e := tw.slots[pos].PushBack(task)
 | |
| 	loc := &location{
 | |
| 		slot:  pos,
 | |
| 		etask: e,
 | |
| 	}
 | |
| 	if task.key != "" {
 | |
| 		_, ok := tw.timer[task.key]
 | |
| 		if ok {
 | |
| 			tw.removeTask(task.key)
 | |
| 		}
 | |
| 	}
 | |
| 	tw.timer[task.key] = loc
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
 | |
| 	delaySeconds := int(d.Seconds())
 | |
| 	intervalSeconds := int(tw.interval.Seconds())
 | |
| 	circle = int(delaySeconds / intervalSeconds / tw.slotNum)
 | |
| 	pos = int(tw.currentPos+delaySeconds/intervalSeconds) % tw.slotNum
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (tw *TimeWheel) removeTask(key string) {
 | |
| 	pos, ok := tw.timer[key]
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 	l := tw.slots[pos.slot]
 | |
| 	l.Remove(pos.etask)
 | |
| 	delete(tw.timer, key)
 | |
| }
 |