timewheel简介
- 时间轮是一个环形队列,底层实现就是一个固定长度的数组,数组中的每个元素存储一个双向列表,这个列表存放着该时间内需要执行的所有任务
例子
- 抽象点来说时钟表盘就是1秒为一个时间刻度,一共(一天)会有86400个刻度的时间轮,当指针走到那个刻度的时候就可以把对应的任务全部取出执行
- 也就是说这里我们定义了一个可以延迟86400秒的时候轮,不过当我们定一个86401秒后执行的任务怎么办
- 方案一 多级时间轮 (这里不展开描述)
- 方案二 定义circle参数 (本文), 一轮是86400秒的话,86401= 86400 + 1 也就是 1circle(一轮)加上一个刻度就可以取出该任务执行
结构体
type TimeWheel struct {
interval time.Duration
slots []*list.List
slotsNum int64
currentSlots int64
ticker *time.Ticker
mt sync.Mutex
isRun bool
tasks sync.Map
addTaskCh chan *Task
removeTaskCh chan string
closeCh chan struct{}
}
type Task struct {
ID string
createTime time.Time
delay time.Duration
slots int64
circle int64 // 多少圈
job Job
times int64 //执行多少次 -1 一直执行
}
启动时间轮
//定义定时器驱动时间轮
t.ticker = time.NewTicker(t.interval)
func (t *TimeWheel) run() {
for {
select {
case _ = <-t.ticker.C:
t.runTask()
case task := <-t.addTaskCh:
t.addTask(task, true)
case id := <-t.removeTaskCh:
t.delTask(id)
case _ = <-t.closeCh:
t.ticker.Stop()
break
}
}
}
向时间轮添加任务
func (t *TimeWheel) AddTask(ID string, job Job, delay time.Duration, times ...int64) error {
if ID == "" {
return errors.New("ID is empty")
}
if delay < t.interval {
return errors.New("the delay time must be greater than the interval time")
}
var timesInt64 int64 = 1
if len(times) > 0 {
timesInt64 = times[0]
}
_, ok := t.tasks.Load(ID)
if ok {
return errors.New("ID already exists")
}
task := &Task{
ID: ID,
createTime: time.Now(),
job: job,
delay: delay,
times: timesInt64,
}
t.addTaskCh <- task
return nil
}
func (t *TimeWheel) addTask(task *Task, first bool) {
task.circle, task.slots = t.getCircleAndSlots(task.delay, first)
ele := t.slots[task.slots].PushBack(task)
t.tasks.Store(task.ID, ele)
}
func (t *TimeWheel) getCircleAndSlots(delay time.Duration, first bool) (circle, slots int64) {
delaySed := int64(delay.Seconds())
intervalSed := int64(t.interval.Seconds())
circle = delaySed / intervalSed / t.slotsNum
slots = delaySed - (t.slotsNum * intervalSed * circle) + t.currentSlots
if slots == t.currentSlots && circle > 0 {
circle--
}
//第一次加入时 当前秒(currentSlots)还未执行,比如当前是第一秒的slot(0) 延迟5秒计算得出为5 (0~5有6格所有需要-1)
//第二次加入时 当前秒(currentSlots)已经执行,就不需要-1
if slots > 0 && first {
slots--
}
return
}
时间轮删除任务
func (t *TimeWheel) RemoveTask(ID string) error {
_, ok := t.tasks.Load(ID)
if !ok {
return errors.New("ID does not exist")
}
t.removeTaskCh <- ID
return nil
}
func (t *TimeWheel) delTask(id string) {
if val, ok := t.tasks.Load(id); ok {
task := val.(*list.Element).Value.(*Task)
t.slots[task.slots].Remove(val.(*list.Element))
t.tasks.Delete(task.ID)
}
}