qq_bot/util/task.go

189 lines
4.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package util
import (
"container/list"
"sync"
"time"
)
// 时间轮定时器
type TimeWheel struct {
interval time.Duration // 时间轮间隔(每格时间)
ticker *time.Ticker // 定时器
slots []*list.List // 时间轮槽
curSlot int // 当前槽位置
slotNum int // 槽数量
taskMap map[string]*Task // 任务映射表
sync.Mutex // 互斥锁
stop chan bool // 停止信号
}
// 定时任务
type Task struct {
delay time.Duration // 延迟时间
circle int // 时间轮转动圈数
key string // 任务唯一标识
callback func() // 回调函数
repeat bool // 是否重复执行
interval time.Duration // 重复执行的时间间隔
}
var (
defaultTimeWheel *TimeWheel
once sync.Once
)
// 初始化默认时间轮1秒一格共60格
func initDefaultTimeWheel() {
once.Do(func() {
defaultTimeWheel = NewTimeWheel(time.Second, 60)
defaultTimeWheel.Start()
})
}
// 获取默认时间轮实例
func GetDefaultTimeWheel() *TimeWheel {
initDefaultTimeWheel()
return defaultTimeWheel
}
// 静态方法:添加一次性任务
func AddTask(key string, delay time.Duration, callback func()) {
initDefaultTimeWheel()
defaultTimeWheel.AddTask(key, delay, callback)
}
// 静态方法:添加循环任务
func AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) {
initDefaultTimeWheel()
defaultTimeWheel.AddCycleTask(key, delay, interval, callback)
}
// 静态方法:删除任务
func RemoveTask(key string) {
initDefaultTimeWheel()
defaultTimeWheel.RemoveTask(key)
}
// 静态方法:停止时间轮
func StopTimeWheel() {
if defaultTimeWheel != nil {
defaultTimeWheel.Stop()
}
}
// 创建时间轮
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
tw := &TimeWheel{
interval: interval,
slots: make([]*list.List, slotNum),
taskMap: make(map[string]*Task),
slotNum: slotNum,
stop: make(chan bool),
}
for i := 0; i < slotNum; i++ {
tw.slots[i] = list.New()
}
return tw
}
// 启动时间轮
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.run()
}
// 停止时间轮
func (tw *TimeWheel) Stop() {
tw.ticker.Stop()
tw.stop <- true
}
// 添加普通任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, callback func()) {
tw.addTask(key, delay, callback, false, 0)
}
// 添加循环任务
func (tw *TimeWheel) AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) {
tw.addTask(key, delay, callback, true, interval)
}
// 内部添加任务方法
func (tw *TimeWheel) addTask(key string, delay time.Duration, callback func(), repeat bool, interval time.Duration) {
tw.Lock()
defer tw.Unlock()
circle := int(delay / (tw.interval * time.Duration(tw.slotNum)))
pos := (tw.curSlot + int(delay/tw.interval)) % tw.slotNum
task := &Task{
delay: delay,
circle: circle,
key: key,
callback: callback,
repeat: repeat,
interval: interval,
}
tw.slots[pos].PushBack(task)
tw.taskMap[key] = task
}
// 删除任务
func (tw *TimeWheel) RemoveTask(key string) {
tw.Lock()
defer tw.Unlock()
delete(tw.taskMap, key)
}
// 运行时间轮
func (tw *TimeWheel) run() {
for {
select {
case <-tw.ticker.C:
tw.Lock()
l := tw.slots[tw.curSlot]
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.circle > 0 {
task.circle--
e = e.Next()
continue
}
go task.callback()
next := e.Next()
// 处理循环任务
if task.repeat {
// 重新计算下一次执行的位置和圈数
circle := int(task.interval / (tw.interval * time.Duration(tw.slotNum)))
pos := (tw.curSlot + int(task.interval/tw.interval)) % tw.slotNum
task.circle = circle
// 从当前槽中移除
l.Remove(e)
// 添加到新的槽位置
tw.slots[pos].PushBack(task)
} else {
// 一次性任务,直接删除
l.Remove(e)
delete(tw.taskMap, task.key)
}
e = next
}
tw.curSlot = (tw.curSlot + 1) % tw.slotNum
tw.Unlock()
case <-tw.stop:
return
}
}
}