利用定时器设计程序(高性能定时器策略之时间轮算法的实现)
© 内容版权所有,转载或复制需附源站地址 www.tanjp.com 谢谢合作。
变更记录
2020-04-25, tanjp, 整理之前实现定时器过程中的问题。
什么时候需要定时器?
我们都知道程序是能快速运算出结果,几乎在一瞬间就可以把结果算出来。但是这个前提是所有输入条件都拿到手的情况下,如果有些输入条件 A 你并不知道什么时候能符合,那怎么办?写一个whlie循环一直检查?这样无疑很浪费CPU,显然行不通。
有经验程序员可以已经想到办法,把这些等待输入条件 A, B, C, ...等等全部记录起来,在其他相关事件触发时,顺便检查一下这些输入条件是否满足?如果满足就执行某个函数,否则下次再检查一次。这种做法,一般的业务系统都是可实现的。但是会导致代码繁琐而且也不好维护,并且当检查条件多了会导致系统性能大幅下降。这时候,就需要使用定时器来定时检查。
还有一种情况是,假设某个游戏战斗逻辑,一个法术攻击使得某个区域内中毒,持续时间从[t1, t2]。也就是说,在未来确定的时间点会发生某些事情的时候,就要在未来的某个时间点添加定时器事件。
时间轮(Timing Wheel)算法原理
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的链表的所有任务。
若时间轮的tickMs=1ms,wheelSize=20,那么可以计算得出interval为20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插入进来会存放到时间格为2的链表中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2所对应的链表中的任务做相应的到期操作。此时若又有一个定时为8ms的任务插入进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10。如果同时有一个定时为19ms的任务插入进来怎么办?如果此时有个定时为350ms的任务该如何处理?当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
参考上图,复用之前的案例,第一层的时间轮tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即为20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。
对于之前所说的350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入到第二层时间轮中时间格17所对应的链表中。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格1的链表中。注意到在到期时间在[400ms,800ms)区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格1中,时间格1对应的链表的超时时间为400ms。随着时间的流逝,当次链表到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为50ms的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历了40ms之后,此时这个任务又被“察觉”到,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历10ms后,此任务真正到期,最终执行相应的到期操作。
时间轮算法的C 实现
1) 基础节点类型和链表元素的添加和删除
struct TickNode
{
TickNode()
: absolute_tick(0)
, relative_tick(0)
, prev_node(0)
, next_node(0)
, receiver(0)
, tid()
, data(0)
, max_count(0)
, expired_count(0) { }
void reset()
{
absolute_tick = 0;
relative_tick = 0;
prev_node = 0;
next_node = 0;
receiver = 0;
tid = 0;
data = 0;
max_count = 0;
expired_count = 0;
}
uint64 absolute_tick; //加入时的当前绝对tick
uint64 relative_tick; //相对于加入时的tick
TickNode * prev_node; //链表上一节点
TickNode * next_node; //链表下一节点
uint32 receiver; //接收者
uint32 tid; //唯一标识
void * data; //数据体
uint32 max_count; //循环次数,0表示无限循环
uint32 expired_count; //已触发到期次数
};
namespace linked_list
{
bool add_node_after_head(TickNode * pp_head, TickNode * pp_node)
{
TickNode * zp_next = pp_head->next_node;
if (zp_next){ zp_next->prev_node = pp_node; }
pp_node->next_node = pp_head->next_node;
pp_node->prev_node = pp_head;
pp_head->next_node = pp_node;
return true;
}
};
2) 时间轮,一个轮子类型
class Wheel
{
typedef std::vector< TickNode* > SlotsType;
public:
explicit Wheel(uint32 pp_maxsize)
: kWheelSize(pp_maxsize > 1 ? pp_maxsize : 2)
, mn_base(1) //注意初始基数是1
, mn_tick(0)
, mp_prev_wheel(0)
, mp_next_wheel(0)
, mc_slots()
{
//每个槽上添加头节点
TickNode * zp_head_node = 0;
for (uint32 i = 0; i < kWheelSize; i)
{
zp_head_node = new TickNode();
mc_slots.push_back(zp_head_node);
}
}
~Wheel()
{
//只清除槽上的头节点,头节点下的链表节点没删除(由业务层删除)
TickNode * tn = 0;
for (auto it = mc_slots.begin(); it != mc_slots.end(); it)
{
tn = *it;
SAFE_DELETE(tn);
}
mc_slots.clear();
}
//设置前后轮关系
bool set_neighbours(Wheel * pp_prev_wheel, Wheel * pp_next_wheel)
{
if (mp_next_wheel || mp_prev_wheel)
{
return false; //不能重复调用
}
if (!pp_prev_wheel && !pp_next_wheel)
{
return false; //参数有误,不可能都为空
}
mp_prev_wheel = pp_prev_wheel;
mp_next_wheel = pp_next_wheel;
return true;
}
//根据之前轮计算基数(要先把所有轮先后关系确定后再调用)
uint64 calc_base()
{
Wheel * tmp = mp_prev_wheel;
while (tmp)
{
mn_base *= tmp->kWheelSize;
tmp = tmp->mp_prev_wheel;
}
return mn_base;
}
bool add(TickNode * pp_node, uint64 pn_relative_tick)
{
uint64 zn_round_index = pn_relative_tick / mn_base;
uint32 zn_left_ticks = kWheelSize - mn_tick; //未触发的数量
if (zn_round_index < uint64(zn_left_ticks))
{
//可以放入当前轮
uint32 zn_index = mn_tick uint32(zn_round_index);
TickNode* zp_head = mc_slots[zn_index];
linked_list::add_node_after_head(zp_head, pp_node);
return true;
}
else if (mp_next_wheel)
{ //放入下一轮
return mp_next_wheel->add(pp_node, pn_relative_tick - (zn_left_ticks * mn_base));
}
else
{ //没有下一轮,溢出了,添加失败
return false;
}
}
TickNode * tick(uint64 pn_now_absolute_tick)
{
if ((mn_tick >= kWheelSize) && mp_next_wheel)
{
//当前轮已满,把下一轮的tick槽移上来
mn_tick = 0; //清零,从头开始
TickNode * zp_next_wheel_data = mp_next_wheel->tick(pn_now_absolute_tick);
TickNode * zp_next_wheel_next = 0;
while (zp_next_wheel_data)
{
zp_next_wheel_next = zp_next_wheel_data->next_node;
uint64 zn_abs_tick = zp_next_wheel_data->absolute_tick zp_next_wheel_data->relative_tick;
uint64 zn_add_index = (zn_abs_tick - pn_now_absolute_tick) / mn_base;
if (zn_add_index >= kWheelSize)
{ // logic error, 下一轮的tick槽放不下?奇怪!不可能吧
}
else
{
TickNode * zp_head_node = mc_slots[zn_add_index];
linked_list::add_node_after_head(zp_head_node, zp_next_wheel_data);
}
zp_next_wheel_data = zp_next_wheel_next;
}
}
else if ((mn_tick >= kWheelSize) && !mp_next_wheel)
{
//logic error, 当前轮已满,找不到下一轮,设置有问题吧
}
TickNode * head_node = mc_slots[mn_tick];
TickNode * zp_tick_list = head_node->next_node;
head_node->next_node = 0;
mn_tick;
return zp_tick_list;
}
private:
const uint32 kWheelSize; //wheelSize 当前轮最大槽数
uint64 mn_base; //tickMs 当前轮的基数
uint32 mn_tick; //currentTime 当前轮的tick值,取值范围[0,mn_maxsize)
Wheel * mp_prev_wheel; //上一轮
Wheel * mp_next_wheel; //下一轮
SlotsType mc_slots; //槽容器
};
3) 多个轮子组合起来,轮组类型。
class WheelGroup
{
typedef std::vector< Wheel* > WheelsType;
public:
WheelGroup(uint32 pn_wheels_count, uint32 pn_size_of_per_wheel)
{
if (pn_wheels_count < 2) pn_wheels_count = 2;
if (pn_size_of_per_wheel < 2) pn_size_of_per_wheel = 2;
for (uint32 i = 0; i < pn_wheels_count; i)
{
Wheel* zp_wheel = new Wheel(pn_size_of_per_wheel);
mo_wheels.push_back(zp_wheel);
}
//设置前后轮关系
mo_wheels[0]->set_neighbours(0, mo_wheels[1]);
for (uint32 i = 1; i <= pn_wheels_count - 2; i)
{
mo_wheels[i]->set_neighbours(mo_wheels[i - 1], mo_wheels[i 1]);
}
mo_wheels[pn_wheels_count - 1]->set_neighbours(mo_wheels[pn_wheels_count - 2], 0);
//计算各个轮的基数
for (uint32 i = 0; i < pn_wheels_count; i)
{
mo_wheels[i]->calc_base();
}
}
~WheelGroup()
{
for (auto it = mo_wheels.begin(); it != mo_wheels.end(); it)
{
Wheel* zp_wheel = *it;
delete zp_wheel;
}
mo_wheels.clear();
}
bool add_tick_node(TickNode * pp_node, uint64 pn_relative_ticks)
{
pp_node->absolute_tick = mn_ticks;
pp_node->relative_tick = pn_relative_ticks;
return mo_wheels[0]->add(pp_node, pn_relative_ticks);
}
TickNode * tick()
{
TickNode* zp_lklist = mo_wheels[0]->tick(mn_ticks);
mn_ticks;
return zp_lklist;
}
uint64 absolute_ticks() const { return mn_ticks; }
void absolute_ticks(uint64 pn_absolute_ticks)
{
mn_ticks = pn_absolute_ticks;
}
private:
uint64 mn_ticks;
WheelsType mo_wheels;
};
4) 封装接口,方便易用
class TimingWheel
{
typedef std::unordered_map<uint32, TickNode*> TimeDataMap;
typedef ObjectCache<TickNode, NullMutex> TickNodeCache;
public:
// receiver tid ticks data
typedef std::tuple< uint32, uint32, uint64, void* > ExpiredResult;
typedef std::vector<ExpiredResult> ResultVector;
TimingWheel();
~TimingWheel();
// @pn_absolute_ticks, 启动时间轮并指定初始绝对时间点。
void start(uint64 pn_absolute_ticks);
// @pn_receiver, 接收者ID。
// @pn_tid, 待添加的定时事件ID。
// @pn_relative_ticks, 相对时间间隔,相对于当前的绝对时间。未来绝对时间 = 当前绝对时间 相对时间间隔。
// @pn_repeat_count, 定时事件重复次数,0表示无限次。
// @pp_data, 定时事件的相关数据,由业务层负责构造和销毁。
// @return 添加成功返回true, 否则返回false。
bool add(uint32 pn_receiver, uint32 pn_tid, uint64 pn_relative_ticks, uint32 pn_repeat_count = 0, void * pp_data = 0);
// 计算到期事件。
// @pn_times, 计算次数。
// @pc_expired_tids, 到期事件结果集。
// @pc_closed_tids, 已经关闭事件结果集。
// @return true表示有事件触发,false无事件触发。
bool calc(uint32 pn_times, ResultVector * pc_expired_tids, ResultVector * pc_closed_tids);
// @return 当前绝对时间点。
uint64 ticks() const;
// 关闭时间轮。
void close();
private:
bool zb_started;
bool zb_closed;
WheelGroup * mp_wheel;
TimeDataMap mc_datas;
TickNodeCache mo_cache;
};
TimingWheel::TimingWheel()
: zb_started(false)
, zb_closed(false)
, mp_wheel(new WheelGroup(10, 100))
, mo_cache(true, 1024)
{
mo_cache.init(64);
}
TimingWheel::~TimingWheel()
{
close();
SAFE_DELETE(mp_wheel);
}
void TimingWheel::start(uint64 pn_absolute_ticks)
{
if (zb_started) { return; }
zb_started = true;
mp_wheel->absolute_ticks(pn_absolute_ticks);
}
bool TimingWheel::add(uint32 pn_receiver, uint32 pn_tid, uint64 pn_relative_ticks, uint32 pn_repeat_count, void * pp_data)
{
if (!zb_started || zb_closed) { return false; }
auto it = mc_datas.find(pn_tid);
if (it != mc_datas.end()) { return false; }
TickNode * zp_node = mo_cache.apply();
zp_node->reset();
zp_node->receiver = pn_receiver;
zp_node->tid = pn_tid;
zp_node->data = pp_data;
zp_node->max_count = pn_repeat_count;
bool zb_ok = mp_wheel->add_tick_node(zp_node, pn_relative_ticks - 1);
if (!zb_ok)
{ //加入失败
mo_cache.revert(zp_node);
return false;
}
mc_datas.insert(std::make_pair(pn_tid, zp_node));
return true;
}
bool TimingWheel::calc(uint32 pn_times, ResultVector * pc_expired_tids, ResultVector * pc_closed_tids)
{
if (!zb_started || zb_closed) { return false; }
bool zb_expired = false;
uint64 zn_ticks = 0;
for (uint32 i = 0; i < pn_times; i)
{
TickNode * zp_node = mp_wheel->tick();
zn_ticks = ticks();
TickNode * zp_node_next = 0;
while (zp_node)
{
zp_node_next = zp_node->next_node;
pc_expired_tids->push_back(std::move(ExpiredResult(zp_node->receiver, zp_node->tid, zn_ticks, zp_node->data)));
zp_node->expired_count = 1;
zb_expired = true;
if ((zp_node->max_count > 0) && (zp_node->expired_count >= zp_node->max_count))
{ //已经到达触发次数上限,触发删除
pc_closed_tids->push_back(std::move(ExpiredResult(zp_node->receiver, zp_node->tid, zn_ticks, zp_node->data)));
mc_datas.erase(zp_node->tid);
mo_cache.revert(zp_node);
}
else
{ //重新设置定时器
bool zb_ok = mp_wheel->add_tick_node(zp_node, zp_node->relative_tick);
if (!zb_ok)
{ //重新加入失败
pc_closed_tids->push_back(std::move(ExpiredResult(zp_node->receiver, zp_node->tid, zn_ticks, zp_node->data)));
mc_datas.erase(zp_node->tid);
mo_cache.revert(zp_node);
}
}
zp_node = zp_node_next;
}
}
return zb_expired;
}
uint64 TimingWheel::ticks() const
{
return mp_wheel->absolute_ticks();
}
void TimingWheel::close()
{
if (zb_closed) { return; }
zb_closed = true;
TickNode * zp_node = 0;
for (auto it = mc_datas.begin(); it != mc_datas.end(); it)
{
zp_node = it->second;
mo_cache.revert(zp_node);
}
mc_datas.clear();
mo_cache.destroy();
mo_cache.force_release();
}
应用例子
测试代码
每 1000毫秒触发一次,也就是时间轮 1秒钟移动一格。定时器Id为11的 3秒触发一次,触发4次后结束。定时器Id为12的 1秒触发一次,触发5次后结束。定时器Id为20的 3秒触发一次,无限循环。
void test1()
{
std::cout << "begin" << std::endl;
TimingWheel tw;
uint64 zn_last_ticks = base::now_sys_sec(false);
uint64 zn_now_ticks = zn_last_ticks;
std::cout << "now=" << zn_now_ticks << "-----" << std::endl;
tw.start(zn_now_ticks);
if (!tw.add(1, 11, 3, 4))
{
std::cout << "11 add error !!!!!" << std::endl;
}
if (!tw.add(1, 12, 1, 5))
{
std::cout << "11 add error !!!!!" << std::endl;
}
if (!tw.add(2, 20, 3, 0))
{
std::cout << "20 add error !!!!!" << std::endl;
}
TimingWheel::ResultVector zc_expired_tids;
TimingWheel::ResultVector zc_closed_tids;
for (uint32 i = 0; i < 100; i)
{
zn_now_ticks = base::now_sys_sec(false);
uint32 zn_times = uint32(zn_now_ticks - zn_last_ticks);
zn_last_ticks = zn_now_ticks;
bool zb_had = tw.calc(zn_times, &zc_expired_tids, &zc_closed_tids);
if (zb_had)
{
std::cout << "expired size=" << zc_expired_tids.size();
for (auto it = zc_expired_tids.begin(); it != zc_expired_tids.end(); it)
{
std::cout << "| tid=" << std::get<1>(*it) << ", ticks=" << std::get<2>(*it);
}
std::cout << std::endl;
std::cout << "closed size=" << zc_closed_tids.size();
for (auto it = zc_closed_tids.begin(); it != zc_closed_tids.end(); it)
{
std::cout << "| tid=" << std::get<1>(*it) << ", ticks=" << std::get<2>(*it) << std::endl;
}
std::cout << std::endl;
std::cout << "----------------" << std::endl;
zc_expired_tids.clear();
zc_closed_tids.clear();
}
THIS_SLEEP_MILLISECONDS(1000);
}
tw.close();
std::cout << "end" << std::endl;
}
输出结果
begin
now=1587848614-----
expired size=1| tid=12, ticks=1587848615
closed size=0
----------------
expired size=1| tid=12, ticks=1587848616
closed size=0
----------------
expired size=3| tid=12, ticks=1587848617| tid=20, ticks=1587848617| tid=11, ticks=1587848617
closed size=0
----------------
expired size=1| tid=12, ticks=1587848618
closed size=0
----------------
expired size=1| tid=12, ticks=1587848619
closed size=1| tid=12, ticks=1587848619
----------------
expired size=2| tid=11, ticks=1587848620| tid=20, ticks=1587848620
closed size=0
----------------
expired size=2| tid=20, ticks=1587848623| tid=11, ticks=1587848623
closed size=0
----------------
expired size=2| tid=11, ticks=1587848626| tid=20, ticks=1587848626
closed size=1| tid=11, ticks=1587848626
----------------
expired size=1| tid=20, ticks=1587848629
closed size=0
----------------
定时器实现过程遇到的问题
在服务端的程序里,时间是一个很常用,几乎无处不在的变量。如果不正确使用时间,将会导致灾难性的数据错误。所以,正确使用时间非常重要!
怎样才算是正确使用时间?
首先,我们得知道在程序世界里面,时间有哪些值?
- 协调世界时,又称世界统一时间,世界标准时间,国际协调时间,简称UTC。世界默认以格林威治时间(GMT)为准。
- 世界各个地区时区时间。如北京时间GMT 8,就是格林威治时间加8小时。
- 以计算机CPU时钟结合程序启动的时间算出的时间戳。
- 时间轮算法每次tick算出的时间轮时间。
然后,如何算是正确使用时间?那得先想想,怎样是不正确的?
- 有一种唯一ID生成算法依赖于时间戳,如果时间改小了,就可能导致唯一ID重复,那可就是灾难性的逻辑错误。
- 有些依赖于系统日期时间的定时器检测,当系统时间由于某种原因导致时间回退到以前,那将会导致定时器卡死等待。甚至,std::this_thread:sleep_for 也是有BUG的,回退时间会导致该函数挂起等待,与期待的效果相违背。
- 有些时间校验的算法,如果就在那一瞬间,时间回退了,导致某个非法校验通过了?那也导致错误。
正确使用时间,应该保证时间戳在程序运行过程总是单调递增,这隐患两个意思,保持递增,不能停下,不能倒退。
最后,总结一下时间和定时器的问题。
- 把时间戳划分为三种:系统日期时间戳(system time),CPU时钟时间戳(cpu time),时间轮执行后的时间戳(tw time)。
- 底层逻辑相关的必须用CPU时钟时间戳(cpu time),如网络心跳,超时检测,唯一ID算法,等等。
- 业务逻辑层使用时间轮时间戳(tw time)的话,要有一套自动同步机制,保证与CPU时钟时间戳(cpu time)单调递增同步。
- 在进程启动前,尽可能保证各个机器的系统时间戳已同步。
- 在测试时间相关功能时,停掉3中所说的同步机制,通过指令加快时间轮时间戳(tw time),可达到改时间测试的效果。但不能把时间回退来测试。
踩过的坑
- std::this_thread:sleep_for 有BUG,改变系统日期时间会导致 std::this_thread:sleep_for 卡住。改用 boost::this_thread:sleep_for 替代。
- 定时器如果依赖于系统日期时间,很可能由于时间同步问题或者手动改时间导致定时器卡死。
- 唯一ID生成算法依赖于时间,所以要采用CPU时钟作为统计的依据,保证时间只会单调递增。
- boost::asio::deadline_timer 改变系统的日期时间,会导致定时器卡住,改用 boost::asio::steady_timer 替代。
© 内容版权所有,转载或复制需附源站地址 www.tanjp.com 谢谢合作。
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com