feat: implement time wheel for scheduling tasks and refactor bond and steam playing checks to use new task scheduling utility

This commit is contained in:
lixiangwuxian 2025-01-10 01:27:50 +08:00
parent fa635ad9f1
commit 8dcef53924
3 changed files with 197 additions and 12 deletions

View File

@ -11,6 +11,7 @@ import (
"git.lxtend.com/qqbot/action" "git.lxtend.com/qqbot/action"
"git.lxtend.com/qqbot/model" "git.lxtend.com/qqbot/model"
"git.lxtend.com/qqbot/sqlite3" "git.lxtend.com/qqbot/sqlite3"
"git.lxtend.com/qqbot/util"
) )
func init() { func init() {
@ -131,25 +132,22 @@ func RoundCheckNewBond() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
// once := true // once := true
for { util.AddCycleTask("checkNewBond", 5*time.Minute, 5*time.Minute, func() {
// if !once {
time.Sleep(5 * time.Minute)
// }
bonds, err := GetBondsData() bonds, err := GetBondsData()
if bonds == nil || err != nil { if bonds == nil || err != nil {
fmt.Println("Error getting bonds data:", err) fmt.Println("Error getting bonds data:", err)
continue return
} }
groups, err := GetGroupListens() groups, err := GetGroupListens()
if err != nil { if err != nil {
fmt.Println("Error getting group listens:", err) fmt.Println("Error getting group listens:", err)
continue return
} }
for _, bond := range bonds { for _, bond := range bonds {
exists, err := BondDataExists(bond.SecurityCode) exists, err := BondDataExists(bond.SecurityCode)
if err != nil { if err != nil {
fmt.Println("Error checking bond data exists:", err) fmt.Println("Error checking bond data exists:", err)
continue return
} }
if !exists { if !exists {
for _, group := range groups { for _, group := range groups {
@ -167,6 +165,5 @@ func RoundCheckNewBond() {
} }
AddBondData(bond) AddBondData(bond)
} }
// once = false })
}
} }

View File

@ -176,11 +176,11 @@ func checkSteamPlaying(msg model.Message) model.Reply {
func RoundCheckSteamPlaying() { func RoundCheckSteamPlaying() {
once := true once := true
playingMap := map[int64]map[string]string{} playingMap := map[int64]map[string]string{}
for { util.AddCycleTask("checkSteamPlaying", 15*time.Second, 15*time.Second, func() {
groups, err := getAllGroupID() groups, err := getAllGroupID()
if err != nil { if err != nil {
fmt.Println("获取群列表失败: ", err) fmt.Println("获取群列表失败: ", err)
continue return
} }
for _, group := range groups { for _, group := range groups {
@ -212,5 +212,5 @@ func RoundCheckSteamPlaying() {
} }
} }
once = false once = false
} })
} }

188
util/task.go Normal file
View File

@ -0,0 +1,188 @@
package util
import (
"container/list"
"sync"
"time"
)
// 时间轮定时器
type TimeWheel struct {
interval time.Duration // 时间轮间隔(每格时间)
ticker *time.Ticker // 定时器
slots []*list.List // 时间轮槽
curSlot int // 当前槽位置
slotNum int // 槽数量
taskMap map[string]*Task // 任务映射表
sync.Mutex // 互斥锁
stop chan bool // 停止信号
}
// 定时任务
type Task struct {
delay time.Duration // 延迟时间
circle int // 时间轮转动圈数
key string // 任务唯一标识
callback func() // 回调函数
repeat bool // 是否重复执行
interval time.Duration // 重复执行的时间间隔
}
var (
defaultTimeWheel *TimeWheel
once sync.Once
)
// 初始化默认时间轮1秒一格共60格
func initDefaultTimeWheel() {
once.Do(func() {
defaultTimeWheel = NewTimeWheel(time.Second, 60)
defaultTimeWheel.Start()
})
}
// 获取默认时间轮实例
func GetDefaultTimeWheel() *TimeWheel {
initDefaultTimeWheel()
return defaultTimeWheel
}
// 静态方法:添加一次性任务
func AddTask(key string, delay time.Duration, callback func()) {
initDefaultTimeWheel()
defaultTimeWheel.AddTask(key, delay, callback)
}
// 静态方法:添加循环任务
func AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) {
initDefaultTimeWheel()
defaultTimeWheel.AddCycleTask(key, delay, interval, callback)
}
// 静态方法:删除任务
func RemoveTask(key string) {
initDefaultTimeWheel()
defaultTimeWheel.RemoveTask(key)
}
// 静态方法:停止时间轮
func StopTimeWheel() {
if defaultTimeWheel != nil {
defaultTimeWheel.Stop()
}
}
// 创建时间轮
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
tw := &TimeWheel{
interval: interval,
slots: make([]*list.List, slotNum),
taskMap: make(map[string]*Task),
slotNum: slotNum,
stop: make(chan bool),
}
for i := 0; i < slotNum; i++ {
tw.slots[i] = list.New()
}
return tw
}
// 启动时间轮
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.run()
}
// 停止时间轮
func (tw *TimeWheel) Stop() {
tw.ticker.Stop()
tw.stop <- true
}
// 添加普通任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, callback func()) {
tw.addTask(key, delay, callback, false, 0)
}
// 添加循环任务
func (tw *TimeWheel) AddCycleTask(key string, delay time.Duration, interval time.Duration, callback func()) {
tw.addTask(key, delay, callback, true, interval)
}
// 内部添加任务方法
func (tw *TimeWheel) addTask(key string, delay time.Duration, callback func(), repeat bool, interval time.Duration) {
tw.Lock()
defer tw.Unlock()
circle := int(delay / (tw.interval * time.Duration(tw.slotNum)))
pos := (tw.curSlot + int(delay/tw.interval)) % tw.slotNum
task := &Task{
delay: delay,
circle: circle,
key: key,
callback: callback,
repeat: repeat,
interval: interval,
}
tw.slots[pos].PushBack(task)
tw.taskMap[key] = task
}
// 删除任务
func (tw *TimeWheel) RemoveTask(key string) {
tw.Lock()
defer tw.Unlock()
delete(tw.taskMap, key)
}
// 运行时间轮
func (tw *TimeWheel) run() {
for {
select {
case <-tw.ticker.C:
tw.Lock()
l := tw.slots[tw.curSlot]
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.circle > 0 {
task.circle--
e = e.Next()
continue
}
go task.callback()
next := e.Next()
// 处理循环任务
if task.repeat {
// 重新计算下一次执行的位置和圈数
circle := int(task.interval / (tw.interval * time.Duration(tw.slotNum)))
pos := (tw.curSlot + int(task.interval/tw.interval)) % tw.slotNum
task.circle = circle
// 从当前槽中移除
l.Remove(e)
// 添加到新的槽位置
tw.slots[pos].PushBack(task)
} else {
// 一次性任务,直接删除
l.Remove(e)
delete(tw.taskMap, task.key)
}
e = next
}
tw.curSlot = (tw.curSlot + 1) % tw.slotNum
tw.Unlock()
case <-tw.stop:
return
}
}
}