怎么设置netty缓冲区大小(NettyChannelOutboundBuffer源码分析)
上一篇文章对RecvByteBufAllocator的源码进行了分析,介绍了Netty是如何接收对端发送过来的数据。以及Netty是如何通过AdaptiveRecvByteBufAllocator来自适应调整ByteBuf的动态分配,解决Java ByteBuffer分配过大浪费内存,分配过小又需要频繁扩容的问题。
本篇文章会分析,Netty是如何将数据发送出去的。
前置知识Netty支持的数据传输类型
首先你需要知道,通过Netty来发送数据,它只支持两种数据类型:ByteBuf和FileRegion。前者可以看作是ByteBuffer,普通的字节数据传输。而后者是文件传输,Netty通过FileRegion来实现文件传输的零拷贝。
write和flush
write()并不会发送数据,只是简单的将数据暂存到ChannelOutboundBuffer。
flush()才是真正的将数据通过Socket传输给对端。
writeAndFlush()只是简单的执行以上两个方法而已。
Channel高低水位线
当程序write了大量数据,或者虽然调用了flush(),但是由于对端来不及接收数据,再或者由于网络原因等等情况,导致TCP缓冲区被写满,大量的消息积压在ChannelOutboundBuffer,导致内存溢出。为了保护你的程序,Netty给Channel设置了「高低水位线」,当积压的消息超过了高水位,Netty会将Channel设为「不可写」状态并触发channelWritabilityChanged回调,你可以通过Channel.isWritable()判断是否要继续写数据。
通过ChannelConfig.setWriteBufferHighWaterMark()和ChannelConfig.setWriteBufferLowWaterMark()设置Channel的高低水位线。
订阅OP_WRITE事件
既然write()操作是用户自己发起的,为啥还要订阅Channel的OP_WRITE事件呢?
因为TCP缓冲区可能被写满,此时你就应该订阅OP_WRITE事件,暂时放弃写操作,等待Selector通知你Channel可写时,你再继续写。
ByteBuf可以转换成ByteBuffer
Java原生的SocketChannel只支持写入ByteBuffer,当你通过Netty写入ByteBuf时,它会将ByteBuf转换成ByteBuffer再写入,方法是ByteBuf.internalNioBuffer()。
清楚Java对象在JVM中的内存布局
write(msg)时,会将msg包装成Entry节点加入到链尾,其中一个属性pendingSize记录着消息占用的内存空间,这个空间大小除了msg数据本身占用的空间外,还包含Entry对象占用的空间,因此默认会额外再加上96。为啥是96后面会说明,首先你应该知道对象的对象头最大占用16字节,对象引用最少占用4字节,最多占用8字节,一个long类型占用8字节,int类型占用4字节,boolean类型占用1字节。另外JVM要求Java对象占用的空间必须是8字节的整数倍,因此还会有padding填充字节。
ChannelhandlerContext.writeAndFlush()分析如下,分别是发送一个ByteBuf和FileRegion的简单示例:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 发送一个 hello
ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 传输一个 a.txt 文件
RandomAccessFile accessFile = new RandomAccessFile("/disk/a.txt", "r");
DefaultFileRegion region = new DefaultFileRegion(accessFile.getChannel(), 0, accessFile.length());
ctx.writeAndFlush(region);
}
我先说一下writeAndFlush的整体流程,实际的发送细节下一节会解释。
调用ctx.writeAndFlush(),会从当前Handler往前找能处理write事件的Handler,如果调用的是ctx.channel().writeAndFlush(),则会从Pipeline的TailContext开始向前找能处理write事件的Handler,事件传播的路径稍微有点区别。默认情况下,会找到HeadContext来处理,源码如下:
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 确保发送的消息不为空
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 往后找能处理 write事件的Channel,默认会找到HeadContext。
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// 如果是EventLoop线程,则直接执行,否则提交一个任务串行化执行。
if (executor.inEventLoop()) {
if (flush) {
// 调用的是writeAndFlush(),所有flush为true,这里会调用HeadContext.invokeWriteAndFlush()
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
找到HeadContext后,调用其invokeWriteAndFlush()方法,其实就是将write和flush放在一个方法里调用了:
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 先通过handler调用write()
invokeWrite0(msg, promise);
// 再通过handler调用flush()
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
先看invokeWrite0(),它会调用HeadContext.write(),由于write操作需要和JDK的底层API交互,于是操作又会被转交给Channel.Unsafe执行:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 需要和JDK底层API交互,转交给Unsafe执行。
unsafe.write(msg, promise);
}
接下来会调用AbstractChannel.AbstractUnsafe.write()方法,它首先会对发送的数据做过滤,只支持ByteBuf和FileRegion两种类型。然后会计算发送的数据占用的内存大小,因为前面说过积压的消息一旦超过Channel的高水位线会将Channel设为「不可写」状态,防止内存溢出。这两步做完以后,会把消息添加到输出缓冲区ChannelOutboundBuffer中。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {// outboundBuffer会随着Channel一同被创建,一般不会为null,这里做了校验。
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
int size;
try {
// 过滤写出消息,确保是ByteBuf或FileRegion,其他对象不支持写出。
msg = filterOutboundMessage(msg);
/*
估算消息占用的内存,作用:
因为write()不会把消息写出到Socket,会暂存在内存里,直到flush()。
Netty为了防止消息堆积,会设置高低水位,消息暂存的总量达到最高水位会将Channel设置不可写状态,
以保护你的程序,避免内存溢出。
详见:io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size()
对于FileRegion,会直接返回0,因为使用了零拷贝技术,不需要把文件读取到JVM进程。
*/
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise, t);
}
return;
}
// write()只会把消息暂存在outboundBuffer,不会真正发送。
outboundBuffer.addMessage(msg, size, promise);
}
关注一下filterOutboundMessage(),它除了过滤消息,还会试图将HeapByteBuf转换成DirectByteBuf。Netty为了提升数据发送的效率,和Socket直接读写的数据会使用直接内存,避免IO操作再发生内存拷贝。
// 过滤出站消息,只支持写出ByteBuf和FileRegion。
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
// 为了避免内存复制,Socket直接读写的数据都使用堆外内存
return newDirectBuffer(buf);
}
// 文件传输
if (msg instanceof FileRegion) {
return msg;
}
// 不支持的数据类型,抛异常
throw new UnsupportedOperationException(
"unsupported message type: " StringUtil.simpleClassName(msg) EXPECTED_TYPES);
}
newDirectBuffer()并不保证一定转换成功,如果使用的ByteBufAllocator是未池化的,且没有开启io.netty.ThreadLocalDirectBufferSize,那么就意味着Netty需要申请一个没有被池化的DirectByteBuf,这个操作是非常昂贵的,Netty会放弃转换:
// 试图将HeapByteBuf转换成DirectByteBuf,如果转换的开销很大会放弃。
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
// 可读字节数为0,直接释放并返回共享空对象。
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
// 获取Channel绑定的ByteBufAllocator
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {// 分配器是否是池化的,且能分配直接内存?
// 创建一个指定大小的直接内存ByteBuf,将数据写入,原buf释放
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
/*
如果设置了io.netty.threadLocalDirectBufferSize,Netty会在线程的FastThreadLocal中通过Stack实现一个轻量级的
ByteBuf对象池,ByteBuf写出到Socket后,会自动释放,这里会将它再push到线程绑定的Stack中进行重用。
*/
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// 申请一个未池化的直接内存ByteBuf开销是很大的,测试过,比堆内存的申请慢10倍都不止,这里会直接放弃。
return buf;
}
如果设置了io.netty.threadLocalDirectBufferSize,Netty会为每个线程创建指定数量的ByteBuf对象缓存,这些ByteBuf是可以被重用的。实现逻辑是Netty会在FastThreadLocal存放一个Stack,需要时pop()一个出来,用完时push()归还。
再来关注一下MessageSizeEstimator,它负责计算待发送数据占用的内存,逻辑很简单,对于FileRegion会返回0,因为FileRegion传输文件时使用了零拷贝技术,直接使用mmap内存映射,而不需要将文件加载到JVM进程,实现直接看io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size():
// 估算消息的内存占用,逻辑还是很简单的。
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
// FileRegion实现了零拷贝,并不需要将文件加载到JVM,因此占用的内存为0,不影响Channel水位线。
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
关于ChannelOutboundBuffer代码下节会详细分析,这里只需要知道write()只会将数据暂存到ChannelOutboundBuffer,而不会真正发送就行了。
消息存入ChannelOutboundBuffer,write操作就算完成了。紧接着会调用invokeFlush0(),它依然会转交给Unsafe执行,调用AbstractChannel.AbstractUnsafe.flush()。
它会做两件事:先把ChannelOutboundBuffer中待发送的Entry标记为flushed,然后将要发送的Entry数据转换成Java的ByteBuffer,使用SocketChannel进行真正的数据发送。
@Override
public final void flush() {
assertEventLoop();
// 得到SocketChannel绑定的ChannelOutboundBuffer
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 先将unflushed节点标记为flushed
outboundBuffer.addFlush();
// 开始发送数据
flush0();
}
flush0()会开始发送数据,它首先会检测Channel是否活跃,如果是非活跃状态,此次flush()操作将失败,Entry会被移除。如果Channel正常,会调用doWrite()进行数据发送。
protected void flush0() {
if (inFlush0) {// 避免上一次flush0()还没执行完时,又触发了
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {// 非空校验
return;
}
inFlush0 = true;
// 如果连接已经失活了。
if (!isActive()) {
try {
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
/*
通道是打开的,稍后可能会被激活。
1.释放msg
2.触发失败通知
3.回收Entry
4.递减消息挂起的字节数
*/
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
/*
道都被关闭了,和上面的处理流程类似,只是不用通过触发channelWritabilityChanged()回调了。
*/
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 连接正常,执行真正的write()操作
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
inFlush0 = false;
}
}
doWrite()是数据发送的核心,由子类实现,这里直接看NioSocketChannel.doWrite()。它会获取Java原生的SocketChannel,将队列中待发送的ByteBuf转换成ByteBuffer,然后循环发送数据。
单次循环发送的数据量受以下两个条件限制:
- ByteBuffer的数量限制。
- TCP参数设置的缓冲区的大小限制(ChannelOption.SO_SNDBUF)。
如果ChannelOutboundBuffer积压了大量的数据,单次可能无法发送完,因此会默认循环发16次。循环次数过多可能会阻塞IO线程,导致其他Channel的事件得不到处理。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 循环写的次数,默认16次。可能有大量消息积压在输出缓冲区,同时为了避免阻塞IO线程,做了次数限制。
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {// 没有数据要写出了
// 取消监听 OP_WRITE事件
clearOpWrite();
return;
}
// 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
/*
将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。
注意,这里只会处理ByteBuf,不会处理FileRegion。
*/
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// 其实就是 nioBuffers.length,上一步方法中会进行设置
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
// ByteBuf处理完了,但是可能还有FileRegion需要处理。
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 只有单个ByteBuf需要发送的情况
ByteBuffer buffer = nioBuffers[0];
// 尝试发送的字节数
int attemptedBytes = buffer.remaining();
// Java原生的SocketChannel.wrote(ByteBuffer)发送数据
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {// TCP缓冲区满,订阅OP_WRITE事件,等待可写时再继续处理
incompleteWrite(true);
return;
}
// 动态调整 发送缓冲区大小
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 删除已经发送的Entry节点
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// 发送缓冲区有多个ByteBuf待发送
// 尝试发送的字节总数
long attemptedBytes = in.nioBufferSize();
// 调用Java原生的SocketChannel.write()进行数据的发送,返回值是实际发送的字节数
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
// 写入字节为0,可能是TCP缓冲区满了,订阅OP_WRITE事件,等待TCP可写时再执行。
incompleteWrite(true);
return;
}
// 根据本次实际写入的字节数,动态调整发送缓冲区:ChannelOption.SO_SNDBUF
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
/*
删除已经发送的数据,是根据实际写入的字节数去删除的,而不是根据ByteBuf的数量。
从flushedEntry开始,计算每个ByteBuf的大小,按个删除。
可能存在某个ByteBuf发送部分数据的情况,会调整它的readerIndex。
*/
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
/*
只有在 nioBufferCnt处理完了,调用doWrite0(in)处理FileRegion,且没处理完毕时,才可能走到这里。
如果FileRegion没有处理完,writeSpinCount会小于0,这里会继续订阅OP_WRITE事件,等待Channel可写时继续处理。
*/
incompleteWrite(writeSpinCount < 0);
}
此外,NioSocketChannel.doWrite()只会发送ByteBuf,FileRegion的发送需要调用父类的AbstractNioByteChannel.doWrite0()处理。
/*
NioSocketChannel只负责发送ByteBuf,
FileRegion的发送这边会处理。
*/
protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg == null) {
// Directly return here so incompleteWrite(...) is not called.
return 0;
}
// 数据发送
return doWriteInternal(in, in.current());
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
// 没有数据可读,直接删除节点
in.remove();
return 0;
}
// Java底层write()
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
// 已经传输的字节数 >= 字节总数,代表文件已经传输完毕,删除节点。
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
// 调用region.transferTo(javaChannel(), position)文件传输
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {// 实际发送的字节数
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {//FileRegion发送完毕,移除节点
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
/*
一般不会走到这里,可能是数据没有处理完毕,返回一个Integer.MAX_VALUE,
让writeSpinCount小于0,这样它就会订阅OP_WRITE事件,等待Channel可写时继续处理。
*/
return WRITE_STATUS_SNDBUF_FULL;
}
需要注意的是,flush()操作可能存在两种情况:
- 数据正常发送完毕。
- 数据没有发完,就已经超过了最大循环次数,为了不阻塞IO线程,下次再处理。
- TCP缓冲区满,数据无法发送。
对于后面两种情况,都属于「不完整写入」,因此会调用incompleteWrite(setOpWrite)稍后继续处理。
针对第三种情况,Netty需要订阅OP_WRITE事件,等待Selector通知Channel可写时继续发送数据。setOpWrite参数代表是否要监听OP_WRITE事件:
/**
* 不完整写入
* @param setOpWrite 是否要订阅OP_WRITE事件
*/
protected final void incompleteWrite(boolean setOpWrite) {
// setOpWrite为true,一般都是TCP缓冲区满了,此时需要订阅OP_WRITE事件,等待Channel可写时再继续处理。
if (setOpWrite) {
// 订阅OP_WRITE事件
setOpWrite();
} else {
// 取消订阅OP_WRITE事件
clearOpWrite();
// 提交一个flush任务,稍后执行,避免阻塞IO线程。
eventLoop().execute(flushTask);
}
}
至此,writeAndFlush()的整个流程就处理完了,对于ChannelOutboundBuffer本节没有进行分析,看下节。
ChannelOutboundBuffer源码分析ChannelOutboundBuffer是Netty的数据发送缓冲区,它跟随SocketChannel一同被创建。
先看属性:
/*
将ByteBuf包装成Entry时,额外占用的字节大小,因为除了ByteBuf本身的数据外,Entry对象也占用空间。
为啥是96?为啥还支持修改??
1.96是Netty根据64位的JVM计算的最大值。
2.如果你的程序运行在32位的JVM中,或者对象引用开启了压缩,你可以根据实际情况修改这个值。
分析为啥最多会占用96字节:
在64位的JVM中,一个Entry对象占用以下空间:
- 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer
- 6个对象引用属性,最小4*6=24字节,最大8*6=48字节
- 2个long属性,2*8=16字节
- 2个int属性,2*4=8字节
- 1个boolean属性,1字节
- padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节
合计最多为96字节.
*/
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
/*
发送数据时,需要将待发送的ByteBuf转换成ByteBuffer,考虑到write是个很频繁的操作,
为了避免频繁创建数组,这里进行了复用,每个线程会复用自己的ByteBuffer[]。
*/
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override
protected ByteBuffer[] initialValue() throws Exception {
// 默认大小为1024,后面有必要还会扩容
return new ByteBuffer[1024];
}
};
// 绑定的SocketChannel
private final Channel channel;
// 已经flush,等待发送的头节点。
private Entry flushedEntry;
// 已经write但是没flush的头节点,flush()时会通过它一直往后找
private Entry unflushedEntry;
// 链尾节点
private Entry tailEntry;
// flush的节点数量,发送数据时会从flushedEntry开始往后找flushed个节点。
private int flushed;
// 单次循环写的Nio Buffer数量
private int nioBufferCount;
// 单次循环写的Nio Buffer总大小
private long nioBufferSize;
// flush是否失败
private boolean inFail;
// 计算totalPendingSize属性的偏移量,通过CAS的方式来做修改。
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
// 输出缓冲区暂存的消息占用的总内存,通过该值判断是否达到高低水位,以修改Channel的可写状态。
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;
// unwritable属性的偏移量,通过CAS的方式来修改。
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
// Channel是否可写的状态,0可写,1不可写。输出缓冲区内存达到高低水位线时修改。
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
// Channel可写状态变更时触发的任务,消息的积压达到高低水位线时触发
private volatile Runnable fireChannelWritabilityChangedTask;
它本身是一个单向链表,由一系列Entry节点组成。它有三个节点指针:
- flushedEntry:已经flush,等待被发送的起始节点指针。
- unflushedEntry:已经write,等待flush的起始节点指针。
- tailEntry:链尾指针。
笔者花了一个简图,来表示它是如何工作的:
上节说过,执行flush(msg)操作时,只是把数据暂存到ChannelOutboundBuffer,核心方法是addMessage(),它主要做了两件事:
- 将msg封装成Entry节点,加入到链尾。
- 统计输出缓冲区的消息总字节数是否达到高水位线,如果达到则将Channel设为「不可写」状态,且触发ChannelWritabilityChanged回调。
/**
* 将消息暂存到ChannelOutboundBuffer,暂存成功promise就会收到通知。
* @param msg 待发送的数据:ByteBuf/FileRegion
* @param size 数据占用的内存大小
* @param promise write成功会收到通知
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 将msg包装成一个Entry,并加入到链尾。
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
// tail不为空,则添加到它的next
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;// tailEntry指向新添加的节点
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调
incrementPendingOutboundBytes(entry.pendingSize, false);
}
先看Entry.newInstance(),它会将msg封装成Entry节点,加入到链尾。
Entry有个属性pendingSize用来记录消息占用的内存空间,需要注意的是,它除了msg本身的数据空间,还会加上Entry对象占用的空间,一个Java对象占用多少空间是在编译期就确定下来的,除了属性占用的空间外,读者还需要了解Java对象的内存布局。
/**
* 创建一个Entry节点,从对象池中取一个
* @param msg 消息本身
* @param size 由MessageSizeEstimator估算出消息占用的内存大小
* @param total 消息本身的大小(区别是FileRegion的处理)
* @param promise write()完成会收到通知
* @return
*/
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
/*
每次write()都需要一个Entry,考虑到write()是一个非常频繁的操作,
为了避免Entry的频繁创建和销毁,这里做了对象池的重用处理。
*/
Entry entry = RECYCLER.get();
entry.msg = msg;
/*
占用的内存为什么还要加上CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD???
除了ByteBuf占用的空间外,Entry本身也占用空间啊。
在64位的JVM中:
- 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer
- 6个对象引用属性,最小4*6=24字节,最大8*6=48字节
- 2个long属性,2*8=16字节
- 2个int属性,2*4=8字节
- 1个boolean属性,1字节
- padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节
合计最多为96字节,因此CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD的默认值即使96。
*/
entry.pendingSize = size CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}
Entry加入链表后,incrementPendingOutboundBytes()会累加字节总数,判断是否超过高水位线:
/**
* 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调
* @param size 消息占用内存的字节数
* @param invokeLater 是否稍后触发再回调
*/
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 统计消息暂存的内存大小,CAS的方式进行累加
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 暂存消息达到高水位线,消息积压了
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 修改可写状态,触发回调
setUnwritable(invokeLater);
}
}
setUnwritable()会在数据总字节数超过高水位线时触发,它会通过自旋 CAS的方式将unwritable从0改为1,然后触发回调:
// 将Channel设为不可写,CAS执行
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
// CAS操作成功,触发回调
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
ChannelOutboundBuffer为write操作所做的工作就这么多了,下面看flush。
执行flush操作时,首先会调用outboundBuffer.addFlush()将unflushed节点标记为flushed,其实就是移动flushedEntry和unflushedEntry指针,这个过程会检查Entry节点是否被取消,如果取消了会跳过节点,同时会递减该Entry占用的内存空间。
// 只是将节点标记为flushed,并没有真正发送数据,会跳过已经被取消的节点。
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ;
// 将entry的promise设为 不可取消 状态
if (!entry.promise.setUncancellable()) {
// 设置失败,说明promise已经取消,需要释放消息,并递减挂起的字节数
int pending = entry.cancel();
// 递减缓冲区的消息字节总数,如果达到低水位线,则将Channel重新设为「可写」状态,并触发回调
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);// 不断往后找 待flush的节点
// 所有的节点都flush了,置为空
unflushedEntry = null;
}
}
节点状态标记完成后,会调用doWrite()开始写数据。首先它需要ChannelOutboundBuffer将flushed节点转换成Java原生的ByteBuffer,方法是nioBuffers()。因为OS对SocketChannel.write()单次发送的字节数有限制,一般是Integer.MAX_VALUE,所以单次转换需要提供两个参数:
- maxCount:转换ByteBuffer的最大数量,默认是1024。
- maxBytes:最大字节数,默认是设置的TCP发送缓冲区大小(ChannelOption.SO_SNDBUF)。
/**
* 将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。
* @param maxCount 单次发送的ByteBuffer最大数量
* @param maxBytes 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。
* @return
*/
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
// 由于write操作很频繁,避免ByteBuffer[]频繁创建和销毁,这里进行了复用,每个线程都有一个ByteBuffer[1024]
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
// 确保Entry节点是flushed,且msg是ByteBuf类型
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
// 确保节点没有被取消,如果取消了,则跳过它。
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
// 可读字节数就是要写出的字节数,确保大于0
if (readableBytes > 0) {
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// 发送的数据超过了maxBytes,退出循环
break;
}
nioBufferSize = readableBytes;
int count = entry.count;
if (count == -1) {
// -1代表没有设置ByteBuf的nioBufferCount,ByteBuf中ByteBuffer的数量
entry.count = count = buf.nioBufferCount();
}
// 是否需要更多的空间
int neededSpace = min(maxCount, nioBufferCount count);
// 如果ByteBuffer的数量超过了默认值1024,就去申请更多的空间
if (neededSpace > nioBuffers.length) {
// 成倍扩容,直到数组长度足够。并塞回到FastThreadLocal。
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// 将ByteBuf转ByteBuffer,且缓存到Entry中
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
// 设置ByteBuffer
nioBuffers[nioBufferCount ] = nioBuf;
} else {
// 一个ByteBuf包含多个ByteBuffer的情况处理,循环遍历设置
nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
}
if (nioBufferCount >= maxCount) {
// ByteBuffer的数量超过了maxCount,退出循环
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
这里关注一下NIO_BUFFERS属性,它是一个FastThreadLocal,每个线程都有自己的ByteBuffer[]缓存,默认长度1024,可以被复用。
这里为什么要复用呢?因为作为一个网络IO框架,flush肯定是一个非常频繁的操作,为了避免每次都创建ByteBuffer[],复用可以提升系统性能,减轻GC的压力。
如果一个ByteBuf由很多个ByteBuffer组成,默认的1024个ByteBuffer可能不够用,此时会调用expandNioBufferArray()进行扩容:
// 对array进行扩容
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// 成倍扩容
newCapacity <<= 1;
if (newCapacity < 0) {// int溢出
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
// 元素迁移
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
将待发送的ByteBuf转换成ByteBuffer后,NioSocketChannel会调用JDK底层的SocketChannel.write()进行真正的数据发送。
数据发送完毕后,需要移除ChannelOutboundBuffer中的节点。节点的添加是从链尾开始,移除则是从链头开始的。
ChannelOutboundBuffer是根据实际发送的字节数来移除节点的,因此会存在某个ByteBuf只发送了部分数据的情况,如果某个ByteBuf数据没有发送完,那么该节点并不会被移除,只会调整它的readerIndex索引,下次继续发送剩余数据。
/**
* 根据写入到TCP缓冲区的字节数来移除ByteBuf。
* @param writtenBytes
*/
public void removeBytes(long writtenBytes) {
for (;;) {
// 从flushedEntry开始计算
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
// 如果单个ByteBuf的数据 <= writtenBytes,则直接移除Entry节点
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
// 存在某个ByteBuf发送部分数据的情况,调整它的readerIndex,下次继续发送。
if (writtenBytes != 0) {
buf.readerIndex(readerIndex (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
// 重置 NIO_BUFFERS
clearNioBuffers();
}
至此,Netty的数据发送核心流程全部分析结束。
总结为了避免每次write都将数据写入TCP缓冲区,Netty的Channel提供了两种操作:write和flush,这需要依赖一个核心类ChannelOutboundBuffer。write只是将数据暂存到缓冲区,flush才是发送数据。同时为了避免消息积压的太多导致OOM,Netty提供了高低水位线,当暂存的消息到达高水位时,Netty会将Channel设为「不可写」,同时触发回调,用户可以根据该状态判断是否需要继续写入消息。
ChannelOutboundBuffer本身是个单向链表,负责管理暂存的消息,当需要发送数据时,它还会负责将ByteBuf转换成ByteBuffer,因为JDK底层的SocketChannel只支持写入ByteBuffer。
数据发送完毕后,ChannelOutboundBuffer还要负责根据实际发送的字节数来移除Entry节点,因为存在某个ByteBuf只发送了部分数据的情况,针对这种特殊情况,ChannelOutboundBuffer不会将节点移除,而是调整它的readerIndex索引,下次继续发送剩余数据。
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com