package util import ( "container/list" "sync" "time" ) // 时间轮定时器 type TimeWheel struct { interval time.Duration // 时间轮间隔(每格时间) ticker *time.Ticker // 定时器 slots []*list.List // 时间轮槽 curSlot int // 当前槽位置 slotNum int // 槽数量 taskMap map[string]*Task // 任务映射表 sync.Mutex // 互斥锁 stop chan bool // 停止信号 } // 定时任务 type Task struct { delay time.Duration // 延迟时间 circle int // 时间轮转动圈数 key string // 任务唯一标识 callback func() // 回调函数 repeat bool // 是否重复执行 interval time.Duration // 重复执行的时间间隔 } var ( defaultTimeWheel *TimeWheel once sync.Once ) // 初始化默认时间轮,1秒一格,共60格 func initDefaultTimeWheel() { once.Do(func() { defaultTimeWheel = NewTimeWheel(time.Second, 60) defaultTimeWheel.Start() }) } // 获取默认时间轮实例 func GetDefaultTimeWheel() *TimeWheel { initDefaultTimeWheel() return defaultTimeWheel } // 静态方法:添加一次性任务 func AddTask(key string, delay time.Duration, callback func()) { initDefaultTimeWheel() defaultTimeWheel.AddTask(key, delay, callback) } // 静态方法:添加循环任务 func AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) { initDefaultTimeWheel() defaultTimeWheel.AddCycleTask(key, delay, interval, callback) } // 静态方法:删除任务 func RemoveTask(key string) { initDefaultTimeWheel() defaultTimeWheel.RemoveTask(key) } // 静态方法:停止时间轮 func StopTimeWheel() { if defaultTimeWheel != nil { defaultTimeWheel.Stop() } } // 创建时间轮 func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel { tw := &TimeWheel{ interval: interval, slots: make([]*list.List, slotNum), taskMap: make(map[string]*Task), slotNum: slotNum, stop: make(chan bool), } for i := 0; i < slotNum; i++ { tw.slots[i] = list.New() } return tw } // 启动时间轮 func (tw *TimeWheel) Start() { tw.ticker = time.NewTicker(tw.interval) go tw.run() } // 停止时间轮 func (tw *TimeWheel) Stop() { tw.ticker.Stop() tw.stop <- true } // 添加普通任务 func (tw *TimeWheel) AddTask(key string, delay time.Duration, callback func()) { tw.addTask(key, delay, callback, false, 0) } // 添加循环任务 func (tw *TimeWheel) AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) { tw.addTask(key, delay, callback, true, interval) } // 内部添加任务方法 func (tw *TimeWheel) addTask(key string, delay time.Duration, callback func(), repeat bool, interval time.Duration) { tw.Lock() defer tw.Unlock() circle := int(delay / (tw.interval * time.Duration(tw.slotNum))) pos := (tw.curSlot + int(delay/tw.interval)) % tw.slotNum task := &Task{ delay: delay, circle: circle, key: key, callback: callback, repeat: repeat, interval: interval, } tw.slots[pos].PushBack(task) tw.taskMap[key] = task } // 删除任务 func (tw *TimeWheel) RemoveTask(key string) { tw.Lock() defer tw.Unlock() delete(tw.taskMap, key) } // 运行时间轮 func (tw *TimeWheel) run() { for { select { case <-tw.ticker.C: tw.Lock() l := tw.slots[tw.curSlot] for e := l.Front(); e != nil; { task := e.Value.(*Task) if task.circle > 0 { task.circle-- e = e.Next() continue } go task.callback() next := e.Next() // 处理循环任务 if task.repeat { // 重新计算下一次执行的位置和圈数 circle := int(task.interval / (tw.interval * time.Duration(tw.slotNum))) pos := (tw.curSlot + int(task.interval/tw.interval)) % tw.slotNum task.circle = circle // 从当前槽中移除 l.Remove(e) // 添加到新的槽位置 tw.slots[pos].PushBack(task) } else { // 一次性任务,直接删除 l.Remove(e) delete(tw.taskMap, task.key) } e = next } tw.curSlot = (tw.curSlot + 1) % tw.slotNum tw.Unlock() case <-tw.stop: return } } }