利用定时器设计程序(高性能定时器策略之时间轮算法的实现)

© 内容版权所有,转载或复制需附源站地址 www.tanjp.com 谢谢合作。

变更记录

2020-04-25, tanjp, 整理之前实现定时器过程中的问题。

什么时候需要定时器?

我们都知道程序是能快速运算出结果,几乎在一瞬间就可以把结果算出来。但是这个前提是所有输入条件都拿到手的情况下,如果有些输入条件 A 你并不知道什么时候能符合,那怎么办?写一个whlie循环一直检查?这样无疑很浪费CPU,显然行不通。

有经验程序员可以已经想到办法,把这些等待输入条件 A, B, C, ...等等全部记录起来,在其他相关事件触发时,顺便检查一下这些输入条件是否满足?如果满足就执行某个函数,否则下次再检查一次。这种做法,一般的业务系统都是可实现的。但是会导致代码繁琐而且也不好维护,并且当检查条件多了会导致系统性能大幅下降。这时候,就需要使用定时器来定时检查。

还有一种情况是,假设某个游戏战斗逻辑,一个法术攻击使得某个区域内中毒,持续时间从[t1, t2]。也就是说,在未来确定的时间点会发生某些事情的时候,就要在未来的某个时间点添加定时器事件。

时间轮(Timing Wheel)算法原理

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的链表的所有任务。

若时间轮的tickMs=1ms,wheelSize=20,那么可以计算得出interval为20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插入进来会存放到时间格为2的链表中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2所对应的链表中的任务做相应的到期操作。此时若又有一个定时为8ms的任务插入进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10。如果同时有一个定时为19ms的任务插入进来怎么办?如果此时有个定时为350ms的任务该如何处理?当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

利用定时器设计程序(高性能定时器策略之时间轮算法的实现)(1)

参考上图,复用之前的案例,第一层的时间轮tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即为20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。

对于之前所说的350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入到第二层时间轮中时间格17所对应的链表中。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格1的链表中。注意到在到期时间在[400ms,800ms)区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格1中,时间格1对应的链表的超时时间为400ms。随着时间的流逝,当次链表到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为50ms的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历了40ms之后,此时这个任务又被“察觉”到,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历10ms后,此任务真正到期,最终执行相应的到期操作。

时间轮算法的C 实现

1) 基础节点类型和链表元素的添加和删除

struct TickNode { TickNode() : absolute_tick(0) , relative_tick(0) , prev_node(0) , next_node(0) , receiver(0) , tid() , data(0) , max_count(0) , expired_count(0) { } void reset() { absolute_tick = 0; relative_tick = 0; prev_node = 0; next_node = 0; receiver = 0; tid = 0; data = 0; max_count = 0; expired_count = 0; } uint64 absolute_tick; //加入时的当前绝对tick uint64 relative_tick; //相对于加入时的tick TickNode * prev_node; //链表上一节点 TickNode * next_node; //链表下一节点 uint32 receiver; //接收者 uint32 tid; //唯一标识 void * data; //数据体 uint32 max_count; //循环次数,0表示无限循环 uint32 expired_count; //已触发到期次数 }; namespace linked_list { bool add_node_after_head(TickNode * pp_head, TickNode * pp_node) { TickNode * zp_next = pp_head->next_node; if (zp_next){ zp_next->prev_node = pp_node; } pp_node->next_node = pp_head->next_node; pp_node->prev_node = pp_head; pp_head->next_node = pp_node; return true; } };

2) 时间轮,一个轮子类型

class Wheel { typedef std::vector< TickNode* > SlotsType; public: explicit Wheel(uint32 pp_maxsize) : kWheelSize(pp_maxsize > 1 ? pp_maxsize : 2) , mn_base(1) //注意初始基数是1 , mn_tick(0) , mp_prev_wheel(0) , mp_next_wheel(0) , mc_slots() { //每个槽上添加头节点 TickNode * zp_head_node = 0; for (uint32 i = 0; i < kWheelSize; i) { zp_head_node = new TickNode(); mc_slots.push_back(zp_head_node); } } ~Wheel() { //只清除槽上的头节点,头节点下的链表节点没删除(由业务层删除) TickNode * tn = 0; for (auto it = mc_slots.begin(); it != mc_slots.end(); it) { tn = *it; SAFE_DELETE(tn); } mc_slots.clear(); } //设置前后轮关系 bool set_neighbours(Wheel * pp_prev_wheel, Wheel * pp_next_wheel) { if (mp_next_wheel || mp_prev_wheel) { return false; //不能重复调用 } if (!pp_prev_wheel && !pp_next_wheel) { return false; //参数有误,不可能都为空 } mp_prev_wheel = pp_prev_wheel; mp_next_wheel = pp_next_wheel; return true; } //根据之前轮计算基数(要先把所有轮先后关系确定后再调用) uint64 calc_base() { Wheel * tmp = mp_prev_wheel; while (tmp) { mn_base *= tmp->kWheelSize; tmp = tmp->mp_prev_wheel; } return mn_base; } bool add(TickNode * pp_node, uint64 pn_relative_tick) { uint64 zn_round_index = pn_relative_tick / mn_base; uint32 zn_left_ticks = kWheelSize - mn_tick; //未触发的数量 if (zn_round_index < uint64(zn_left_ticks)) { //可以放入当前轮 uint32 zn_index = mn_tick uint32(zn_round_index); TickNode* zp_head = mc_slots[zn_index]; linked_list::add_node_after_head(zp_head, pp_node); return true; } else if (mp_next_wheel) { //放入下一轮 return mp_next_wheel->add(pp_node, pn_relative_tick - (zn_left_ticks * mn_base)); } else { //没有下一轮,溢出了,添加失败 return false; } } TickNode * tick(uint64 pn_now_absolute_tick) { if ((mn_tick >= kWheelSize) && mp_next_wheel) { //当前轮已满,把下一轮的tick槽移上来 mn_tick = 0; //清零,从头开始 TickNode * zp_next_wheel_data = mp_next_wheel->tick(pn_now_absolute_tick); TickNode * zp_next_wheel_next = 0; while (zp_next_wheel_data) { zp_next_wheel_next = zp_next_wheel_data->next_node; uint64 zn_abs_tick = zp_next_wheel_data->absolute_tick zp_next_wheel_data->relative_tick; uint64 zn_add_index = (zn_abs_tick - pn_now_absolute_tick) / mn_base; if (zn_add_index >= kWheelSize) { // logic error, 下一轮的tick槽放不下?奇怪!不可能吧 } else { TickNode * zp_head_node = mc_slots[zn_add_index]; linked_list::add_node_after_head(zp_head_node, zp_next_wheel_data); } zp_next_wheel_data = zp_next_wheel_next; } } else if ((mn_tick >= kWheelSize) && !mp_next_wheel) { //logic error, 当前轮已满,找不到下一轮,设置有问题吧 } TickNode * head_node = mc_slots[mn_tick]; TickNode * zp_tick_list = head_node->next_node; head_node->next_node = 0; mn_tick; return zp_tick_list; } private: const uint32 kWheelSize; //wheelSize 当前轮最大槽数 uint64 mn_base; //tickMs 当前轮的基数 uint32 mn_tick; //currentTime 当前轮的tick值,取值范围[0,mn_maxsize) Wheel * mp_prev_wheel; //上一轮 Wheel * mp_next_wheel; //下一轮 SlotsType mc_slots; //槽容器 };

3) 多个轮子组合起来,轮组类型。

class WheelGroup { typedef std::vector< Wheel* > WheelsType; public: WheelGroup(uint32 pn_wheels_count, uint32 pn_size_of_per_wheel) { if (pn_wheels_count < 2) pn_wheels_count = 2; if (pn_size_of_per_wheel < 2) pn_size_of_per_wheel = 2; for (uint32 i = 0; i < pn_wheels_count; i) { Wheel* zp_wheel = new Wheel(pn_size_of_per_wheel); mo_wheels.push_back(zp_wheel); } //设置前后轮关系 mo_wheels[0]->set_neighbours(0, mo_wheels[1]); for (uint32 i = 1; i <= pn_wheels_count - 2; i) { mo_wheels[i]->set_neighbours(mo_wheels[i - 1], mo_wheels[i 1]); } mo_wheels[pn_wheels_count - 1]->set_neighbours(mo_wheels[pn_wheels_count - 2], 0); //计算各个轮的基数 for (uint32 i = 0; i < pn_wheels_count; i) { mo_wheels[i]->calc_base(); } } ~WheelGroup() { for (auto it = mo_wheels.begin(); it != mo_wheels.end(); it) { Wheel* zp_wheel = *it; delete zp_wheel; } mo_wheels.clear(); } bool add_tick_node(TickNode * pp_node, uint64 pn_relative_ticks) { pp_node->absolute_tick = mn_ticks; pp_node->relative_tick = pn_relative_ticks; return mo_wheels[0]->add(pp_node, pn_relative_ticks); } TickNode * tick() { TickNode* zp_lklist = mo_wheels[0]->tick(mn_ticks); mn_ticks; return zp_lklist; } uint64 absolute_ticks() const { return mn_ticks; } void absolute_ticks(uint64 pn_absolute_ticks) { mn_ticks = pn_absolute_ticks; } private: uint64 mn_ticks; WheelsType mo_wheels; };

4) 封装接口,方便易用

class TimingWheel { typedef std::unordered_map<uint32, TickNode*> TimeDataMap; typedef ObjectCache<TickNode, NullMutex> TickNodeCache; public: // receiver tid ticks data typedef std::tuple< uint32, uint32, uint64, void* > ExpiredResult; typedef std::vector<ExpiredResult> ResultVector; TimingWheel(); ~TimingWheel(); // @pn_absolute_ticks, 启动时间轮并指定初始绝对时间点。 void start(uint64 pn_absolute_ticks); // @pn_receiver, 接收者ID。 // @pn_tid, 待添加的定时事件ID。 // @pn_relative_ticks, 相对时间间隔,相对于当前的绝对时间。未来绝对时间 = 当前绝对时间 相对时间间隔。 // @pn_repeat_count, 定时事件重复次数,0表示无限次。 // @pp_data, 定时事件的相关数据,由业务层负责构造和销毁。 // @return 添加成功返回true, 否则返回false。 bool add(uint32 pn_receiver, uint32 pn_tid, uint64 pn_relative_ticks, uint32 pn_repeat_count = 0, void * pp_data = 0); // 计算到期事件。 // @pn_times, 计算次数。 // @pc_expired_tids, 到期事件结果集。 // @pc_closed_tids, 已经关闭事件结果集。 // @return true表示有事件触发,false无事件触发。 bool calc(uint32 pn_times, ResultVector * pc_expired_tids, ResultVector * pc_closed_tids); // @return 当前绝对时间点。 uint64 ticks() const; // 关闭时间轮。 void close(); private: bool zb_started; bool zb_closed; WheelGroup * mp_wheel; TimeDataMap mc_datas; TickNodeCache mo_cache; }; TimingWheel::TimingWheel() : zb_started(false) , zb_closed(false) , mp_wheel(new WheelGroup(10, 100)) , mo_cache(true, 1024) { mo_cache.init(64); } TimingWheel::~TimingWheel() { close(); SAFE_DELETE(mp_wheel); } void TimingWheel::start(uint64 pn_absolute_ticks) { if (zb_started) { return; } zb_started = true; mp_wheel->absolute_ticks(pn_absolute_ticks); } bool TimingWheel::add(uint32 pn_receiver, uint32 pn_tid, uint64 pn_relative_ticks, uint32 pn_repeat_count, void * pp_data) { if (!zb_started || zb_closed) { return false; } auto it = mc_datas.find(pn_tid); if (it != mc_datas.end()) { return false; } TickNode * zp_node = mo_cache.apply(); zp_node->reset(); zp_node->receiver = pn_receiver; zp_node->tid = pn_tid; zp_node->data = pp_data; zp_node->max_count = pn_repeat_count; bool zb_ok = mp_wheel->add_tick_node(zp_node, pn_relative_ticks - 1); if (!zb_ok) { //加入失败 mo_cache.revert(zp_node); return false; } mc_datas.insert(std::make_pair(pn_tid, zp_node)); return true; } bool TimingWheel::calc(uint32 pn_times, ResultVector * pc_expired_tids, ResultVector * pc_closed_tids) { if (!zb_started || zb_closed) { return false; } bool zb_expired = false; uint64 zn_ticks = 0; for (uint32 i = 0; i < pn_times; i) { TickNode * zp_node = mp_wheel->tick(); zn_ticks = ticks(); TickNode * zp_node_next = 0; while (zp_node) { zp_node_next = zp_node->next_node; pc_expired_tids->push_back(std::move(ExpiredResult(zp_node->receiver, zp_node->tid, zn_ticks, zp_node->data))); zp_node->expired_count = 1; zb_expired = true; if ((zp_node->max_count > 0) && (zp_node->expired_count >= zp_node->max_count)) { //已经到达触发次数上限,触发删除 pc_closed_tids->push_back(std::move(ExpiredResult(zp_node->receiver, zp_node->tid, zn_ticks, zp_node->data))); mc_datas.erase(zp_node->tid); mo_cache.revert(zp_node); } else { //重新设置定时器 bool zb_ok = mp_wheel->add_tick_node(zp_node, zp_node->relative_tick); if (!zb_ok) { //重新加入失败 pc_closed_tids->push_back(std::move(ExpiredResult(zp_node->receiver, zp_node->tid, zn_ticks, zp_node->data))); mc_datas.erase(zp_node->tid); mo_cache.revert(zp_node); } } zp_node = zp_node_next; } } return zb_expired; } uint64 TimingWheel::ticks() const { return mp_wheel->absolute_ticks(); } void TimingWheel::close() { if (zb_closed) { return; } zb_closed = true; TickNode * zp_node = 0; for (auto it = mc_datas.begin(); it != mc_datas.end(); it) { zp_node = it->second; mo_cache.revert(zp_node); } mc_datas.clear(); mo_cache.destroy(); mo_cache.force_release(); }

应用例子

测试代码

每 1000毫秒触发一次,也就是时间轮 1秒钟移动一格。定时器Id为11的 3秒触发一次,触发4次后结束。定时器Id为12的 1秒触发一次,触发5次后结束。定时器Id为20的 3秒触发一次,无限循环。

void test1() { std::cout << "begin" << std::endl; TimingWheel tw; uint64 zn_last_ticks = base::now_sys_sec(false); uint64 zn_now_ticks = zn_last_ticks; std::cout << "now=" << zn_now_ticks << "-----" << std::endl; tw.start(zn_now_ticks); if (!tw.add(1, 11, 3, 4)) { std::cout << "11 add error !!!!!" << std::endl; } if (!tw.add(1, 12, 1, 5)) { std::cout << "11 add error !!!!!" << std::endl; } if (!tw.add(2, 20, 3, 0)) { std::cout << "20 add error !!!!!" << std::endl; } TimingWheel::ResultVector zc_expired_tids; TimingWheel::ResultVector zc_closed_tids; for (uint32 i = 0; i < 100; i) { zn_now_ticks = base::now_sys_sec(false); uint32 zn_times = uint32(zn_now_ticks - zn_last_ticks); zn_last_ticks = zn_now_ticks; bool zb_had = tw.calc(zn_times, &zc_expired_tids, &zc_closed_tids); if (zb_had) { std::cout << "expired size=" << zc_expired_tids.size(); for (auto it = zc_expired_tids.begin(); it != zc_expired_tids.end(); it) { std::cout << "| tid=" << std::get<1>(*it) << ", ticks=" << std::get<2>(*it); } std::cout << std::endl; std::cout << "closed size=" << zc_closed_tids.size(); for (auto it = zc_closed_tids.begin(); it != zc_closed_tids.end(); it) { std::cout << "| tid=" << std::get<1>(*it) << ", ticks=" << std::get<2>(*it) << std::endl; } std::cout << std::endl; std::cout << "----------------" << std::endl; zc_expired_tids.clear(); zc_closed_tids.clear(); } THIS_SLEEP_MILLISECONDS(1000); } tw.close(); std::cout << "end" << std::endl; }

输出结果

begin now=1587848614----- expired size=1| tid=12, ticks=1587848615 closed size=0 ---------------- expired size=1| tid=12, ticks=1587848616 closed size=0 ---------------- expired size=3| tid=12, ticks=1587848617| tid=20, ticks=1587848617| tid=11, ticks=1587848617 closed size=0 ---------------- expired size=1| tid=12, ticks=1587848618 closed size=0 ---------------- expired size=1| tid=12, ticks=1587848619 closed size=1| tid=12, ticks=1587848619 ---------------- expired size=2| tid=11, ticks=1587848620| tid=20, ticks=1587848620 closed size=0 ---------------- expired size=2| tid=20, ticks=1587848623| tid=11, ticks=1587848623 closed size=0 ---------------- expired size=2| tid=11, ticks=1587848626| tid=20, ticks=1587848626 closed size=1| tid=11, ticks=1587848626 ---------------- expired size=1| tid=20, ticks=1587848629 closed size=0 ----------------

定时器实现过程遇到的问题

在服务端的程序里,时间是一个很常用,几乎无处不在的变量。如果不正确使用时间,将会导致灾难性的数据错误。所以,正确使用时间非常重要!

怎样才算是正确使用时间?

首先,我们得知道在程序世界里面,时间有哪些值?

  1. 协调世界时,又称世界统一时间,世界标准时间,国际协调时间,简称UTC。世界默认以格林威治时间(GMT)为准。
  2. 世界各个地区时区时间。如北京时间GMT 8,就是格林威治时间加8小时。
  3. 以计算机CPU时钟结合程序启动的时间算出的时间戳。
  4. 时间轮算法每次tick算出的时间轮时间

然后,如何算是正确使用时间?那得先想想,怎样是不正确的?

  1. 有一种唯一ID生成算法依赖于时间戳,如果时间改小了,就可能导致唯一ID重复,那可就是灾难性的逻辑错误。
  2. 有些依赖于系统日期时间的定时器检测,当系统时间由于某种原因导致时间回退到以前,那将会导致定时器卡死等待。甚至,std::this_thread:sleep_for 也是有BUG的,回退时间会导致该函数挂起等待,与期待的效果相违背。
  3. 有些时间校验的算法,如果就在那一瞬间,时间回退了,导致某个非法校验通过了?那也导致错误。

正确使用时间,应该保证时间戳在程序运行过程总是单调递增,这隐患两个意思,保持递增,不能停下,不能倒退。

最后,总结一下时间和定时器的问题。

  1. 把时间戳划分为三种:系统日期时间戳(system time)CPU时钟时间戳(cpu time)时间轮执行后的时间戳(tw time)
  2. 底层逻辑相关的必须用CPU时钟时间戳(cpu time),如网络心跳,超时检测,唯一ID算法,等等。
  3. 业务逻辑层使用时间轮时间戳(tw time)的话,要有一套自动同步机制,保证与CPU时钟时间戳(cpu time)单调递增同步。
  4. 在进程启动前,尽可能保证各个机器的系统时间戳已同步。
  5. 在测试时间相关功能时,停掉3中所说的同步机制,通过指令加快时间轮时间戳(tw time),可达到改时间测试的效果。但不能把时间回退来测试。

踩过的坑

  1. std::this_thread:sleep_for 有BUG,改变系统日期时间会导致 std::this_thread:sleep_for 卡住。改用 boost::this_thread:sleep_for 替代。
  2. 定时器如果依赖于系统日期时间,很可能由于时间同步问题或者手动改时间导致定时器卡死。
  3. 唯一ID生成算法依赖于时间,所以要采用CPU时钟作为统计的依据,保证时间只会单调递增。
  4. boost::asio::deadline_timer 改变系统的日期时间,会导致定时器卡住,改用 boost::asio::steady_timer 替代。

© 内容版权所有,转载或复制需附源站地址 www.tanjp.com 谢谢合作。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页