rocketmq 延时队列源码(RocketMQ源码分析之心跳检测机制IdleStateHandler及关闭链接分析)
#创作挑战赛#
一、前言RocketMQ使用了IdleStateHandler来进行心跳检测,客户端和服务端保持长连接需要通过一个检测机制来确保链接的有效性,在链接处于空闲状态或者一方宕机又或者网络延迟,在这种情况下就要确认链接是否有效,无效链接就需要客户端和服务端都关闭当前链路,释放文件句柄资源。
二、RocketMQ使用IdleStateHandler场景读者可能看到这里有可能会比较懵,我们前文有解析的,我再带一下;
在NettyRemotingServer网络通信服务器的start方法中会去构建一个正在的netty server组件ServerBootstrap,在这里里面就直接构建了一个IdleStateHandler用来监听端口accept事件和客户端read/write事件;
ChannelInitializer是一个抽象类,不能直接使用,需要重写initChannel方法。
ChannelInitializer的使用场景:
- 在ServerBootstrap初始化时,为监听端口accept事件的Channel添加ServerBootstrapAcceptor;
- 在有新链接进入时,为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler;
// 网络连接最大的空闲时间,120s,超过2分钟没有进行通信,就直接断开一个长连接
private int serverChannelMaxIdleTimeSeconds = 120;
// 真正的在这里创建netty server
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServersocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()) // tcp三次握手的accept队列长度
.option(ChannelOption.SO_REUSEADDR, true) // server socket channel unbind端口监听了以后,还处于延迟unbind状态,重新启动允许我们可以立马监听这个端口
.option(ChannelOption.SO_KEEPALIVE, false) // 是否自动发送探测包探测网络连接是否还存活
.childOption(ChannelOption.TCP_NODELAY, true) // nodelay,禁止打包传输,避免通信有延迟
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder, // 编码器
new NettyDecoder(), // 解码器
// 空闲探测
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
规定在120秒之内,没有发生read或write事件的时候,就会触发fireUserEventTriggered,触发用户自定义事件的执行(关闭链接)。
三、源码分析导读- 分析IdleStateHandler构造函数,有那些参数;
- 内部是如何进行定时调度的;
- 如果120秒之内没有read或write事件,如何通知RouteInfoManager路由管理组件关闭链接;
- IdleStateHandler如何在发生read或write事件的时候如何更新最近一次读或写时间;
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
//读空闲时间
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
//写空闲时间
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
//读或写空闲时间
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
- readerIdleTime:定义读空闲时间,在这个时间间隔内如果链路没有发生读事件,会触发定时任务的执行。
- writerIdleTime:定义写空闲时间,在这个时间间隔内如果链路没有发生写事件,也会触发定时任务的执行。
- allIdleTime:定义读或写空闲时间,在这个时间间隔内如果链路既没有发生读事件也没有发生写事件,会触发定时任务的执行。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/Netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
//当前channel所在线程池也是一个周期性任务线程池
return ctx.executor().schedule(task, delay, unit);
}
IdleStateHandler也是被作为一个Handle添加到ChannelPipeline中,当其添加成功的时候就会触发hadnlerAdded事件:他会判断当前channel链路是否已生效并且已经注册到selector监听器上,此时就会执行initialize函数。
- state:作为IdleStateHandler的状态字段,0 - none, 1 - initialized, 2 - destroyed,已初始化或被销毁就不再执行初始化方法;
- lastReaTime:当前时间作为最近一次读或写时间;
- 如果定义了读或写或者读写空闲时间的时候,就开启对应的IdleTimeoutTask周期性超时任务;
- ReaderIdleTimeoutTask是以lastReadTime最近一次读请求来确保链接有效性;
- WriterIdleTimeoutTask是以lastWriteTime最近一次写请求来确保链接有效性;
- AllIdleTimeoutTask是取lastReadTime和lastWriteTime最大的一个时间来确保链接有效性;Math.max(lastReadTime, lastWriteTime);
这里拿ReaderIdleTimeoutTask来作为例子进行分析,其它的类似:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
//构造IdleStateHandler时定义的读空闲时间间隔
long nextDelay = readerIdleTimeNanos;
//链路是否处于读事件中,有读事件的时候该值为true
if (!reading) {
//下一次周期任务等待时间 = 读空闲定义时间 - (当前时间-最近一次读事件时间)
nextDelay -= ticksInNanos() - lastReadTime;
}
//nextDelay小于0,说明周期性定时任务该出发执行了
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
//这里生成一个读空闲事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
//触发用户自定义事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
//IdleStateHandler监听到读事件的时候只是会把reading置为true,然后将读事件向下传播
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
在给定的时间内链路一直处于读空闲状态的时候,定时任务检测到了之后会触发fireUserEventTriggered事件,这个时候可以通过自定义一个Handler来实现fireUserEventTriggered方法,来对链路读空闲的处理,服务端可以选择关闭链路,客户端可以选择发送一个ping报文给服务端。
3、通知RouteInfoManager路由管理组件关闭链接,释放资源io.netty.channel.AbstractChannelHandlerContext#fireUserEventTriggered
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
return this;
}
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
ObjectUtil.checkNotNull(event, "event");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
}
这里就调用NettyRemotingServer中的网络连接管理组件NettyConnectManageHandler的invokeUserEventTriggered方法进行处理
private void invokeUserEventTriggered(Object event) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireUserEventTriggered(event);
}
}
这个释放里面会去调用NettyRemotingServer的putNettyEvent方法
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(
new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())
);
}
}
}
ctx.fireUserEventTriggered(evt);
}
NettyRemotingServer直接放入内部的线程中进行处理
public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecutor.putNettyEvent(event);
}
因为之前传递过来的是一个IDLE空置事件,所以会走onChannelIdle方法;
class NettyEventExecutor extends ServiceThread {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
// 线程在运行的过程中,如果发生了跟broker网络连接有事件,put netty event放到队列里来
public void putNettyEvent(final NettyEvent event) {
int currentSize = this.eventQueue.size();
if (currentSize <= maxSize) {
this.eventQueue.add(event);
} else {
log.warn("event queue size [{}] over the limit [{}], so drop this event {}", currentSize, maxSize, event.toString());
}
}
@Override
public void run() {
log.info(this.getServiceName() " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE: // 空置
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
log.warn(this.getServiceName() " service has exception. ", e);
}
}
log.info(this.getServiceName() " service end");
}
@Override
public String getServiceName() {
return NettyEventExecutor.class.getSimpleName();
}
}
org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService#onChannelIdle
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
直接调用到RouteInfoManager的onChannelDestroy方法,从路由表中将其移除;
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
// 获取读锁
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
// 遍历brokerLiveTable
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
// 获取使用该channel的brokerAddr
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
// channel为空或者没有使用该channel的Broker
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
// 申请写锁
this.lock.writeLock().lockInterruptibly();
// 根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
// 遍历 brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
// 移除该 brokerAddr的信息
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
// 遍历 clusterAddrTable
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
// 成功移除Broker之后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
if (removeBrokerName) {
// 遍历 topicQueueTable
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
// 如果队列已经为空,移除该Topic
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
// 释放写锁
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
1、读请求发生更新最近一次读时间;
- 发送读请求时调用channelRead方法将reading设置为true,说明当前有读请求发生;
- 读请求完成调用channelReadComplete方法将lastReadTime最近一次读时间更新,将reading设置为false;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
}
2、写请求发生更新最近一次写时间;
对其添加一个监听器,写请求完成时回调该监听器完成最近一次写时间更新;
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// Allow writing with void promise if handler is only configured for read timeout events.
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
ctx.write(msg, promise.unvoid()).addListener(writeListener);
} else {
ctx.write(msg, promise);
}
}
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
RocketMQ心跳机制通过IdleStateHandler空闲探测实现,如果超过120秒则通过NettyConnectManageHandler连接管理组件调用RouteInfoManager的onChannelDestroy方法,从路由表中将其移除;
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com