From 8dcef53924bed20bb2d27c5be64775f2b7b86862 Mon Sep 17 00:00:00 2001 From: lixiangwuxian Date: Fri, 10 Jan 2025 01:27:50 +0800 Subject: [PATCH] feat: implement time wheel for scheduling tasks and refactor bond and steam playing checks to use new task scheduling utility --- handler/newbond/service.go | 15 +- handler/steamplaying/steam_playing.go | 6 +- util/task.go | 188 ++++++++++++++++++++++++++ 3 files changed, 197 insertions(+), 12 deletions(-) create mode 100644 util/task.go diff --git a/handler/newbond/service.go b/handler/newbond/service.go index bfa9077..eb9ac1a 100644 --- a/handler/newbond/service.go +++ b/handler/newbond/service.go @@ -11,6 +11,7 @@ import ( "git.lxtend.com/qqbot/action" "git.lxtend.com/qqbot/model" "git.lxtend.com/qqbot/sqlite3" + "git.lxtend.com/qqbot/util" ) func init() { @@ -131,25 +132,22 @@ func RoundCheckNewBond() { time.Sleep(5 * time.Second) } // once := true - for { - // if !once { - time.Sleep(5 * time.Minute) - // } + util.AddCycleTask("checkNewBond", 5*time.Minute, 5*time.Minute, func() { bonds, err := GetBondsData() if bonds == nil || err != nil { fmt.Println("Error getting bonds data:", err) - continue + return } groups, err := GetGroupListens() if err != nil { fmt.Println("Error getting group listens:", err) - continue + return } for _, bond := range bonds { exists, err := BondDataExists(bond.SecurityCode) if err != nil { fmt.Println("Error checking bond data exists:", err) - continue + return } if !exists { for _, group := range groups { @@ -167,6 +165,5 @@ func RoundCheckNewBond() { } AddBondData(bond) } - // once = false - } + }) } diff --git a/handler/steamplaying/steam_playing.go b/handler/steamplaying/steam_playing.go index 86bf249..671f690 100644 --- a/handler/steamplaying/steam_playing.go +++ b/handler/steamplaying/steam_playing.go @@ -176,11 +176,11 @@ func checkSteamPlaying(msg model.Message) model.Reply { func RoundCheckSteamPlaying() { once := true playingMap := map[int64]map[string]string{} - for { + util.AddCycleTask("checkSteamPlaying", 15*time.Second, 15*time.Second, func() { groups, err := getAllGroupID() if err != nil { fmt.Println("获取群列表失败: ", err) - continue + return } for _, group := range groups { @@ -212,5 +212,5 @@ func RoundCheckSteamPlaying() { } } once = false - } + }) } diff --git a/util/task.go b/util/task.go new file mode 100644 index 0000000..7818293 --- /dev/null +++ b/util/task.go @@ -0,0 +1,188 @@ +package util + +import ( + "container/list" + "sync" + "time" +) + +// 时间轮定时器 +type TimeWheel struct { + interval time.Duration // 时间轮间隔(每格时间) + ticker *time.Ticker // 定时器 + slots []*list.List // 时间轮槽 + curSlot int // 当前槽位置 + slotNum int // 槽数量 + taskMap map[string]*Task // 任务映射表 + sync.Mutex // 互斥锁 + stop chan bool // 停止信号 +} + +// 定时任务 +type Task struct { + delay time.Duration // 延迟时间 + circle int // 时间轮转动圈数 + key string // 任务唯一标识 + callback func() // 回调函数 + repeat bool // 是否重复执行 + interval time.Duration // 重复执行的时间间隔 +} + +var ( + defaultTimeWheel *TimeWheel + once sync.Once +) + +// 初始化默认时间轮,1秒一格,共60格 +func initDefaultTimeWheel() { + once.Do(func() { + defaultTimeWheel = NewTimeWheel(time.Second, 60) + defaultTimeWheel.Start() + }) +} + +// 获取默认时间轮实例 +func GetDefaultTimeWheel() *TimeWheel { + initDefaultTimeWheel() + return defaultTimeWheel +} + +// 静态方法:添加一次性任务 +func AddTask(key string, delay time.Duration, callback func()) { + initDefaultTimeWheel() + defaultTimeWheel.AddTask(key, delay, callback) +} + +// 静态方法:添加循环任务 +func AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) { + initDefaultTimeWheel() + defaultTimeWheel.AddCycleTask(key, delay, interval, callback) +} + +// 静态方法:删除任务 +func RemoveTask(key string) { + initDefaultTimeWheel() + defaultTimeWheel.RemoveTask(key) +} + +// 静态方法:停止时间轮 +func StopTimeWheel() { + if defaultTimeWheel != nil { + defaultTimeWheel.Stop() + } +} + +// 创建时间轮 +func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel { + tw := &TimeWheel{ + interval: interval, + slots: make([]*list.List, slotNum), + taskMap: make(map[string]*Task), + slotNum: slotNum, + stop: make(chan bool), + } + + for i := 0; i < slotNum; i++ { + tw.slots[i] = list.New() + } + + return tw +} + +// 启动时间轮 +func (tw *TimeWheel) Start() { + tw.ticker = time.NewTicker(tw.interval) + go tw.run() +} + +// 停止时间轮 +func (tw *TimeWheel) Stop() { + tw.ticker.Stop() + tw.stop <- true +} + +// 添加普通任务 +func (tw *TimeWheel) AddTask(key string, delay time.Duration, callback func()) { + tw.addTask(key, delay, callback, false, 0) +} + +// 添加循环任务 +func (tw *TimeWheel) AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) { + tw.addTask(key, delay, callback, true, interval) +} + +// 内部添加任务方法 +func (tw *TimeWheel) addTask(key string, delay time.Duration, callback func(), repeat bool, interval time.Duration) { + tw.Lock() + defer tw.Unlock() + + circle := int(delay / (tw.interval * time.Duration(tw.slotNum))) + pos := (tw.curSlot + int(delay/tw.interval)) % tw.slotNum + + task := &Task{ + delay: delay, + circle: circle, + key: key, + callback: callback, + repeat: repeat, + interval: interval, + } + + tw.slots[pos].PushBack(task) + tw.taskMap[key] = task +} + +// 删除任务 +func (tw *TimeWheel) RemoveTask(key string) { + tw.Lock() + defer tw.Unlock() + + delete(tw.taskMap, key) +} + +// 运行时间轮 +func (tw *TimeWheel) run() { + for { + select { + case <-tw.ticker.C: + tw.Lock() + l := tw.slots[tw.curSlot] + for e := l.Front(); e != nil; { + task := e.Value.(*Task) + if task.circle > 0 { + task.circle-- + e = e.Next() + continue + } + + go task.callback() + next := e.Next() + + // 处理循环任务 + if task.repeat { + // 重新计算下一次执行的位置和圈数 + circle := int(task.interval / (tw.interval * time.Duration(tw.slotNum))) + pos := (tw.curSlot + int(task.interval/tw.interval)) % tw.slotNum + + task.circle = circle + // 从当前槽中移除 + l.Remove(e) + // 添加到新的槽位置 + tw.slots[pos].PushBack(task) + } else { + // 一次性任务,直接删除 + l.Remove(e) + delete(tw.taskMap, task.key) + } + + e = next + } + + tw.curSlot = (tw.curSlot + 1) % tw.slotNum + tw.Unlock() + + case <-tw.stop: + return + } + } +}