怎么设置netty缓冲区大小(NettyChannelOutboundBuffer源码分析)

上一篇文章对RecvByteBufAllocator的源码进行了分析,介绍了Netty是如何接收对端发送过来的数据。以及Netty是如何通过AdaptiveRecvByteBufAllocator来自适应调整ByteBuf的动态分配,解决Java ByteBuffer分配过大浪费内存,分配过小又需要频繁扩容的问题。

本篇文章会分析,Netty是如何将数据发送出去的。

前置知识

Netty支持的数据传输类型

首先你需要知道,通过Netty来发送数据,它只支持两种数据类型:ByteBuf和FileRegion。前者可以看作是ByteBuffer,普通的字节数据传输。而后者是文件传输,Netty通过FileRegion来实现文件传输的零拷贝。

write和flush

write()并不会发送数据,只是简单的将数据暂存到ChannelOutboundBuffer。

flush()才是真正的将数据通过Socket传输给对端。

writeAndFlush()只是简单的执行以上两个方法而已。

Channel高低水位线

当程序write了大量数据,或者虽然调用了flush(),但是由于对端来不及接收数据,再或者由于网络原因等等情况,导致TCP缓冲区被写满,大量的消息积压在ChannelOutboundBuffer,导致内存溢出。为了保护你的程序,Netty给Channel设置了「高低水位线」,当积压的消息超过了高水位,Netty会将Channel设为「不可写」状态并触发channelWritabilityChanged回调,你可以通过Channel.isWritable()判断是否要继续写数据。

通过ChannelConfig.setWriteBufferHighWaterMark()和ChannelConfig.setWriteBufferLowWaterMark()设置Channel的高低水位线。

订阅OP_WRITE事件

既然write()操作是用户自己发起的,为啥还要订阅Channel的OP_WRITE事件呢?

因为TCP缓冲区可能被写满,此时你就应该订阅OP_WRITE事件,暂时放弃写操作,等待Selector通知你Channel可写时,你再继续写。

ByteBuf可以转换成ByteBuffer

Java原生的SocketChannel只支持写入ByteBuffer,当你通过Netty写入ByteBuf时,它会将ByteBuf转换成ByteBuffer再写入,方法是ByteBuf.internalNioBuffer()。

清楚Java对象在JVM中的内存布局

write(msg)时,会将msg包装成Entry节点加入到链尾,其中一个属性pendingSize记录着消息占用的内存空间,这个空间大小除了msg数据本身占用的空间外,还包含Entry对象占用的空间,因此默认会额外再加上96。为啥是96后面会说明,首先你应该知道对象的对象头最大占用16字节,对象引用最少占用4字节,最多占用8字节,一个long类型占用8字节,int类型占用4字节,boolean类型占用1字节。另外JVM要求Java对象占用的空间必须是8字节的整数倍,因此还会有padding填充字节。

ChannelhandlerContext.writeAndFlush()分析

如下,分别是发送一个ByteBuf和FileRegion的简单示例:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 发送一个 hello ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes())); }

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 传输一个 a.txt 文件 RandomAccessFile accessFile = new RandomAccessFile("/disk/a.txt", "r"); DefaultFileRegion region = new DefaultFileRegion(accessFile.getChannel(), 0, accessFile.length()); ctx.writeAndFlush(region); }

我先说一下writeAndFlush的整体流程,实际的发送细节下一节会解释。

调用ctx.writeAndFlush(),会从当前Handler往前找能处理write事件的Handler,如果调用的是ctx.channel().writeAndFlush(),则会从Pipeline的TailContext开始向前找能处理write事件的Handler,事件传播的路径稍微有点区别。默认情况下,会找到HeadContext来处理,源码如下:

private void write(Object msg, boolean flush, ChannelPromise promise) { // 确保发送的消息不为空 ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } // 往后找能处理 write事件的Channel,默认会找到HeadContext。 final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); // 如果是EventLoop线程,则直接执行,否则提交一个任务串行化执行。 if (executor.inEventLoop()) { if (flush) { // 调用的是writeAndFlush(),所有flush为true,这里会调用HeadContext.invokeWriteAndFlush() next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final WriteTask task = WriteTask.newInstance(next, m, promise, flush); if (!safeExecute(executor, task, promise, m, !flush)) { task.cancel(); } } }

找到HeadContext后,调用其invokeWriteAndFlush()方法,其实就是将write和flush放在一个方法里调用了:

void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { // 先通过handler调用write() invokeWrite0(msg, promise); // 再通过handler调用flush() invokeFlush0(); } else { writeAndFlush(msg, promise); } }

先看invokeWrite0(),它会调用HeadContext.write(),由于write操作需要和JDK的底层API交互,于是操作又会被转交给Channel.Unsafe执行:

@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // 需要和JDK底层API交互,转交给Unsafe执行。 unsafe.write(msg, promise); }

接下来会调用AbstractChannel.AbstractUnsafe.write()方法,它首先会对发送的数据做过滤,只支持ByteBuf和FileRegion两种类型。然后会计算发送的数据占用的内存大小,因为前面说过积压的消息一旦超过Channel的高水位线会将Channel设为「不可写」状态,防止内存溢出。这两步做完以后,会把消息添加到输出缓冲区ChannelOutboundBuffer中。

@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) {// outboundBuffer会随着Channel一同被创建,一般不会为null,这里做了校验。 try { ReferenceCountUtil.release(msg); } finally { safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)")); } return; } int size; try { // 过滤写出消息,确保是ByteBuf或FileRegion,其他对象不支持写出。 msg = filterOutboundMessage(msg); /* 估算消息占用的内存,作用: 因为write()不会把消息写出到Socket,会暂存在内存里,直到flush()。 Netty为了防止消息堆积,会设置高低水位,消息暂存的总量达到最高水位会将Channel设置不可写状态, 以保护你的程序,避免内存溢出。 详见:io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size() 对于FileRegion,会直接返回0,因为使用了零拷贝技术,不需要把文件读取到JVM进程。 */ size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { try { ReferenceCountUtil.release(msg); } finally { safeSetFailure(promise, t); } return; } // write()只会把消息暂存在outboundBuffer,不会真正发送。 outboundBuffer.addMessage(msg, size, promise); }

关注一下filterOutboundMessage(),它除了过滤消息,还会试图将HeapByteBuf转换成DirectByteBuf。Netty为了提升数据发送的效率,和Socket直接读写的数据会使用直接内存,避免IO操作再发生内存拷贝。

// 过滤出站消息,只支持写出ByteBuf和FileRegion。 @Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } // 为了避免内存复制,Socket直接读写的数据都使用堆外内存 return newDirectBuffer(buf); } // 文件传输 if (msg instanceof FileRegion) { return msg; } // 不支持的数据类型,抛异常 throw new UnsupportedOperationException( "unsupported message type: " StringUtil.simpleClassName(msg) EXPECTED_TYPES); }

newDirectBuffer()并不保证一定转换成功,如果使用的ByteBufAllocator是未池化的,且没有开启io.netty.ThreadLocalDirectBufferSize,那么就意味着Netty需要申请一个没有被池化的DirectByteBuf,这个操作是非常昂贵的,Netty会放弃转换:

// 试图将HeapByteBuf转换成DirectByteBuf,如果转换的开销很大会放弃。 protected final ByteBuf newDirectBuffer(ByteBuf buf) { final int readableBytes = buf.readableBytes(); if (readableBytes == 0) { // 可读字节数为0,直接释放并返回共享空对象。 ReferenceCountUtil.safeRelease(buf); return Unpooled.EMPTY_BUFFER; } // 获取Channel绑定的ByteBufAllocator final ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) {// 分配器是否是池化的,且能分配直接内存? // 创建一个指定大小的直接内存ByteBuf,将数据写入,原buf释放 ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } /* 如果设置了io.netty.threadLocalDirectBufferSize,Netty会在线程的FastThreadLocal中通过Stack实现一个轻量级的 ByteBuf对象池,ByteBuf写出到Socket后,会自动释放,这里会将它再push到线程绑定的Stack中进行重用。 */ final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); if (directBuf != null) { directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } // 申请一个未池化的直接内存ByteBuf开销是很大的,测试过,比堆内存的申请慢10倍都不止,这里会直接放弃。 return buf; }

如果设置了io.netty.threadLocalDirectBufferSize,Netty会为每个线程创建指定数量的ByteBuf对象缓存,这些ByteBuf是可以被重用的。实现逻辑是Netty会在FastThreadLocal存放一个Stack,需要时pop()一个出来,用完时push()归还。

再来关注一下MessageSizeEstimator,它负责计算待发送数据占用的内存,逻辑很简单,对于FileRegion会返回0,因为FileRegion传输文件时使用了零拷贝技术,直接使用mmap内存映射,而不需要将文件加载到JVM进程,实现直接看io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size():

// 估算消息的内存占用,逻辑还是很简单的。 @Override public int size(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } // FileRegion实现了零拷贝,并不需要将文件加载到JVM,因此占用的内存为0,不影响Channel水位线。 if (msg instanceof FileRegion) { return 0; } return unknownSize; }

关于ChannelOutboundBuffer代码下节会详细分析,这里只需要知道write()只会将数据暂存到ChannelOutboundBuffer,而不会真正发送就行了。

消息存入ChannelOutboundBuffer,write操作就算完成了。紧接着会调用invokeFlush0(),它依然会转交给Unsafe执行,调用AbstractChannel.AbstractUnsafe.flush()。

它会做两件事:先把ChannelOutboundBuffer中待发送的Entry标记为flushed,然后将要发送的Entry数据转换成Java的ByteBuffer,使用SocketChannel进行真正的数据发送。

@Override public final void flush() { assertEventLoop(); // 得到SocketChannel绑定的ChannelOutboundBuffer ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // 先将unflushed节点标记为flushed outboundBuffer.addFlush(); // 开始发送数据 flush0(); }

flush0()会开始发送数据,它首先会检测Channel是否活跃,如果是非活跃状态,此次flush()操作将失败,Entry会被移除。如果Channel正常,会调用doWrite()进行数据发送。

protected void flush0() { if (inFlush0) {// 避免上一次flush0()还没执行完时,又触发了 return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) {// 非空校验 return; } inFlush0 = true; // 如果连接已经失活了。 if (!isActive()) { try { if (!outboundBuffer.isEmpty()) { if (isOpen()) { /* 通道是打开的,稍后可能会被激活。 1.释放msg 2.触发失败通知 3.回收Entry 4.递减消息挂起的字节数 */ outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { /* 道都被关闭了,和上面的处理流程类似,只是不用通过触发channelWritabilityChanged()回调了。 */ outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false); } } } finally { inFlush0 = false; } return; } try { // 连接正常,执行真正的write()操作 doWrite(outboundBuffer); } catch (Throwable t) { handleWriteError(t); } finally { inFlush0 = false; } }

doWrite()是数据发送的核心,由子类实现,这里直接看NioSocketChannel.doWrite()。它会获取Java原生的SocketChannel,将队列中待发送的ByteBuf转换成ByteBuffer,然后循环发送数据。

单次循环发送的数据量受以下两个条件限制:

  1. ByteBuffer的数量限制。
  2. TCP参数设置的缓冲区的大小限制(ChannelOption.SO_SNDBUF)。

如果ChannelOutboundBuffer积压了大量的数据,单次可能无法发送完,因此会默认循环发16次。循环次数过多可能会阻塞IO线程,导致其他Channel的事件得不到处理。

@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); // 循环写的次数,默认16次。可能有大量消息积压在输出缓冲区,同时为了避免阻塞IO线程,做了次数限制。 int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) {// 没有数据要写出了 // 取消监听 OP_WRITE事件 clearOpWrite(); return; } // 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); /* 将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。 注意,这里只会处理ByteBuf,不会处理FileRegion。 */ ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // 其实就是 nioBuffers.length,上一步方法中会进行设置 int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: // ByteBuf处理完了,但是可能还有FileRegion需要处理。 writeSpinCount -= doWrite0(in); break; case 1: { // 只有单个ByteBuf需要发送的情况 ByteBuffer buffer = nioBuffers[0]; // 尝试发送的字节数 int attemptedBytes = buffer.remaining(); // Java原生的SocketChannel.wrote(ByteBuffer)发送数据 final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) {// TCP缓冲区满,订阅OP_WRITE事件,等待可写时再继续处理 incompleteWrite(true); return; } // 动态调整 发送缓冲区大小 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // 删除已经发送的Entry节点 in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // 发送缓冲区有多个ByteBuf待发送 // 尝试发送的字节总数 long attemptedBytes = in.nioBufferSize(); // 调用Java原生的SocketChannel.write()进行数据的发送,返回值是实际发送的字节数 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { // 写入字节为0,可能是TCP缓冲区满了,订阅OP_WRITE事件,等待TCP可写时再执行。 incompleteWrite(true); return; } // 根据本次实际写入的字节数,动态调整发送缓冲区:ChannelOption.SO_SNDBUF adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); /* 删除已经发送的数据,是根据实际写入的字节数去删除的,而不是根据ByteBuf的数量。 从flushedEntry开始,计算每个ByteBuf的大小,按个删除。 可能存在某个ByteBuf发送部分数据的情况,会调整它的readerIndex。 */ in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); /* 只有在 nioBufferCnt处理完了,调用doWrite0(in)处理FileRegion,且没处理完毕时,才可能走到这里。 如果FileRegion没有处理完,writeSpinCount会小于0,这里会继续订阅OP_WRITE事件,等待Channel可写时继续处理。 */ incompleteWrite(writeSpinCount < 0); }

此外,NioSocketChannel.doWrite()只会发送ByteBuf,FileRegion的发送需要调用父类的AbstractNioByteChannel.doWrite0()处理。

/* NioSocketChannel只负责发送ByteBuf, FileRegion的发送这边会处理。 */ protected final int doWrite0(ChannelOutboundBuffer in) throws Exception { Object msg = in.current(); if (msg == null) { // Directly return here so incompleteWrite(...) is not called. return 0; } // 数据发送 return doWriteInternal(in, in.current()); } private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { // 没有数据可读,直接删除节点 in.remove(); return 0; } // Java底层write() final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; // 已经传输的字节数 >= 字节总数,代表文件已经传输完毕,删除节点。 if (region.transferred() >= region.count()) { in.remove(); return 0; } // 调用region.transferTo(javaChannel(), position)文件传输 long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount > 0) {// 实际发送的字节数 in.progress(localFlushedAmount); if (region.transferred() >= region.count()) {//FileRegion发送完毕,移除节点 in.remove(); } return 1; } } else { // Should not reach here. throw new Error(); } /* 一般不会走到这里,可能是数据没有处理完毕,返回一个Integer.MAX_VALUE, 让writeSpinCount小于0,这样它就会订阅OP_WRITE事件,等待Channel可写时继续处理。 */ return WRITE_STATUS_SNDBUF_FULL; }

需要注意的是,flush()操作可能存在两种情况:

  1. 数据正常发送完毕。
  2. 数据没有发完,就已经超过了最大循环次数,为了不阻塞IO线程,下次再处理。
  1. TCP缓冲区满,数据无法发送。

对于后面两种情况,都属于「不完整写入」,因此会调用incompleteWrite(setOpWrite)稍后继续处理。

针对第三种情况,Netty需要订阅OP_WRITE事件,等待Selector通知Channel可写时继续发送数据。setOpWrite参数代表是否要监听OP_WRITE事件:

/** * 不完整写入 * @param setOpWrite 是否要订阅OP_WRITE事件 */ protected final void incompleteWrite(boolean setOpWrite) { // setOpWrite为true,一般都是TCP缓冲区满了,此时需要订阅OP_WRITE事件,等待Channel可写时再继续处理。 if (setOpWrite) { // 订阅OP_WRITE事件 setOpWrite(); } else { // 取消订阅OP_WRITE事件 clearOpWrite(); // 提交一个flush任务,稍后执行,避免阻塞IO线程。 eventLoop().execute(flushTask); } }

至此,writeAndFlush()的整个流程就处理完了,对于ChannelOutboundBuffer本节没有进行分析,看下节。

ChannelOutboundBuffer源码分析

ChannelOutboundBuffer是Netty的数据发送缓冲区,它跟随SocketChannel一同被创建。

先看属性:

/* 将ByteBuf包装成Entry时,额外占用的字节大小,因为除了ByteBuf本身的数据外,Entry对象也占用空间。 为啥是96?为啥还支持修改?? 1.96是Netty根据64位的JVM计算的最大值。 2.如果你的程序运行在32位的JVM中,或者对象引用开启了压缩,你可以根据实际情况修改这个值。 分析为啥最多会占用96字节: 在64位的JVM中,一个Entry对象占用以下空间: - 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer - 6个对象引用属性,最小4*6=24字节,最大8*6=48字节 - 2个long属性,2*8=16字节 - 2个int属性,2*4=8字节 - 1个boolean属性,1字节 - padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节 合计最多为96字节. */ static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); /* 发送数据时,需要将待发送的ByteBuf转换成ByteBuffer,考虑到write是个很频繁的操作, 为了避免频繁创建数组,这里进行了复用,每个线程会复用自己的ByteBuffer[]。 */ private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() throws Exception { // 默认大小为1024,后面有必要还会扩容 return new ByteBuffer[1024]; } }; // 绑定的SocketChannel private final Channel channel; // 已经flush,等待发送的头节点。 private Entry flushedEntry; // 已经write但是没flush的头节点,flush()时会通过它一直往后找 private Entry unflushedEntry; // 链尾节点 private Entry tailEntry; // flush的节点数量,发送数据时会从flushedEntry开始往后找flushed个节点。 private int flushed; // 单次循环写的Nio Buffer数量 private int nioBufferCount; // 单次循环写的Nio Buffer总大小 private long nioBufferSize; // flush是否失败 private boolean inFail; // 计算totalPendingSize属性的偏移量,通过CAS的方式来做修改。 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); // 输出缓冲区暂存的消息占用的总内存,通过该值判断是否达到高低水位,以修改Channel的可写状态。 @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize; // unwritable属性的偏移量,通过CAS的方式来修改。 private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); // Channel是否可写的状态,0可写,1不可写。输出缓冲区内存达到高低水位线时修改。 @SuppressWarnings("UnusedDeclaration") private volatile int unwritable; // Channel可写状态变更时触发的任务,消息的积压达到高低水位线时触发 private volatile Runnable fireChannelWritabilityChangedTask;

它本身是一个单向链表,由一系列Entry节点组成。它有三个节点指针:

  • flushedEntry:已经flush,等待被发送的起始节点指针。
  • unflushedEntry:已经write,等待flush的起始节点指针。
  • tailEntry:链尾指针。

笔者花了一个简图,来表示它是如何工作的:

怎么设置netty缓冲区大小(NettyChannelOutboundBuffer源码分析)(1)

上节说过,执行flush(msg)操作时,只是把数据暂存到ChannelOutboundBuffer,核心方法是addMessage(),它主要做了两件事:

  1. 将msg封装成Entry节点,加入到链尾。
  2. 统计输出缓冲区的消息总字节数是否达到高水位线,如果达到则将Channel设为「不可写」状态,且触发ChannelWritabilityChanged回调。

/** * 将消息暂存到ChannelOutboundBuffer,暂存成功promise就会收到通知。 * @param msg 待发送的数据:ByteBuf/FileRegion * @param size 数据占用的内存大小 * @param promise write成功会收到通知 */ public void addMessage(Object msg, int size, ChannelPromise promise) { // 将msg包装成一个Entry,并加入到链尾。 Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; } else { // tail不为空,则添加到它的next Entry tail = tailEntry; tail.next = entry; } tailEntry = entry;// tailEntry指向新添加的节点 if (unflushedEntry == null) { unflushedEntry = entry; } // 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调 incrementPendingOutboundBytes(entry.pendingSize, false); }

先看Entry.newInstance(),它会将msg封装成Entry节点,加入到链尾。

Entry有个属性pendingSize用来记录消息占用的内存空间,需要注意的是,它除了msg本身的数据空间,还会加上Entry对象占用的空间,一个Java对象占用多少空间是在编译期就确定下来的,除了属性占用的空间外,读者还需要了解Java对象的内存布局。

/** * 创建一个Entry节点,从对象池中取一个 * @param msg 消息本身 * @param size 由MessageSizeEstimator估算出消息占用的内存大小 * @param total 消息本身的大小(区别是FileRegion的处理) * @param promise write()完成会收到通知 * @return */ static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { /* 每次write()都需要一个Entry,考虑到write()是一个非常频繁的操作, 为了避免Entry的频繁创建和销毁,这里做了对象池的重用处理。 */ Entry entry = RECYCLER.get(); entry.msg = msg; /* 占用的内存为什么还要加上CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD??? 除了ByteBuf占用的空间外,Entry本身也占用空间啊。 在64位的JVM中: - 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer - 6个对象引用属性,最小4*6=24字节,最大8*6=48字节 - 2个long属性,2*8=16字节 - 2个int属性,2*4=8字节 - 1个boolean属性,1字节 - padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节 合计最多为96字节,因此CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD的默认值即使96。 */ entry.pendingSize = size CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD; entry.total = total; entry.promise = promise; return entry; }

Entry加入链表后,incrementPendingOutboundBytes()会累加字节总数,判断是否超过高水位线:

/** * 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调 * @param size 消息占用内存的字节数 * @param invokeLater 是否稍后触发再回调 */ private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } // 统计消息暂存的内存大小,CAS的方式进行累加 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); // 暂存消息达到高水位线,消息积压了 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { // 修改可写状态,触发回调 setUnwritable(invokeLater); } }

setUnwritable()会在数据总字节数超过高水位线时触发,它会通过自旋 CAS的方式将unwritable从0改为1,然后触发回调:

// 将Channel设为不可写,CAS执行 private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0) { // CAS操作成功,触发回调 fireChannelWritabilityChanged(invokeLater); } break; } } }

ChannelOutboundBuffer为write操作所做的工作就这么多了,下面看flush。

执行flush操作时,首先会调用outboundBuffer.addFlush()将unflushed节点标记为flushed,其实就是移动flushedEntry和unflushedEntry指针,这个过程会检查Entry节点是否被取消,如果取消了会跳过节点,同时会递减该Entry占用的内存空间。

// 只是将节点标记为flushed,并没有真正发送数据,会跳过已经被取消的节点。 public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ; // 将entry的promise设为 不可取消 状态 if (!entry.promise.setUncancellable()) { // 设置失败,说明promise已经取消,需要释放消息,并递减挂起的字节数 int pending = entry.cancel(); // 递减缓冲区的消息字节总数,如果达到低水位线,则将Channel重新设为「可写」状态,并触发回调 decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null);// 不断往后找 待flush的节点 // 所有的节点都flush了,置为空 unflushedEntry = null; } }

节点状态标记完成后,会调用doWrite()开始写数据。首先它需要ChannelOutboundBuffer将flushed节点转换成Java原生的ByteBuffer,方法是nioBuffers()。因为OS对SocketChannel.write()单次发送的字节数有限制,一般是Integer.MAX_VALUE,所以单次转换需要提供两个参数:

  • maxCount:转换ByteBuffer的最大数量,默认是1024。
  • maxBytes:最大字节数,默认是设置的TCP发送缓冲区大小(ChannelOption.SO_SNDBUF)。

/** * 将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。 * @param maxCount 单次发送的ByteBuffer最大数量 * @param maxBytes 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。 * @return */ public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) { assert maxCount > 0; assert maxBytes > 0; long nioBufferSize = 0; int nioBufferCount = 0; // 由于write操作很频繁,避免ByteBuffer[]频繁创建和销毁,这里进行了复用,每个线程都有一个ByteBuffer[1024] final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; // 确保Entry节点是flushed,且msg是ByteBuf类型 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { // 确保节点没有被取消,如果取消了,则跳过它。 if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; // 可读字节数就是要写出的字节数,确保大于0 if (readableBytes > 0) { if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) { // 发送的数据超过了maxBytes,退出循环 break; } nioBufferSize = readableBytes; int count = entry.count; if (count == -1) { // -1代表没有设置ByteBuf的nioBufferCount,ByteBuf中ByteBuffer的数量 entry.count = count = buf.nioBufferCount(); } // 是否需要更多的空间 int neededSpace = min(maxCount, nioBufferCount count); // 如果ByteBuffer的数量超过了默认值1024,就去申请更多的空间 if (neededSpace > nioBuffers.length) { // 成倍扩容,直到数组长度足够。并塞回到FastThreadLocal。 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // 将ByteBuf转ByteBuffer,且缓存到Entry中 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } // 设置ByteBuffer nioBuffers[nioBufferCount ] = nioBuf; } else { // 一个ByteBuf包含多个ByteBuffer的情况处理,循环遍历设置 nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount); } if (nioBufferCount >= maxCount) { // ByteBuffer的数量超过了maxCount,退出循环 break; } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }

这里关注一下NIO_BUFFERS属性,它是一个FastThreadLocal,每个线程都有自己的ByteBuffer[]缓存,默认长度1024,可以被复用。

这里为什么要复用呢?因为作为一个网络IO框架,flush肯定是一个非常频繁的操作,为了避免每次都创建ByteBuffer[],复用可以提升系统性能,减轻GC的压力。

如果一个ByteBuf由很多个ByteBuffer组成,默认的1024个ByteBuffer可能不够用,此时会调用expandNioBufferArray()进行扩容:

// 对array进行扩容 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { int newCapacity = array.length; do { // 成倍扩容 newCapacity <<= 1; if (newCapacity < 0) {// int溢出 throw new IllegalStateException(); } } while (neededSpace > newCapacity); ByteBuffer[] newArray = new ByteBuffer[newCapacity]; // 元素迁移 System.arraycopy(array, 0, newArray, 0, size); return newArray; }

将待发送的ByteBuf转换成ByteBuffer后,NioSocketChannel会调用JDK底层的SocketChannel.write()进行真正的数据发送。

数据发送完毕后,需要移除ChannelOutboundBuffer中的节点。节点的添加是从链尾开始,移除则是从链头开始的。

ChannelOutboundBuffer是根据实际发送的字节数来移除节点的,因此会存在某个ByteBuf只发送了部分数据的情况,如果某个ByteBuf数据没有发送完,那么该节点并不会被移除,只会调整它的readerIndex索引,下次继续发送剩余数据。

/** * 根据写入到TCP缓冲区的字节数来移除ByteBuf。 * @param writtenBytes */ public void removeBytes(long writtenBytes) { for (;;) { // 从flushedEntry开始计算 Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; // 如果单个ByteBuf的数据 <= writtenBytes,则直接移除Entry节点 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); } else { // readableBytes > writtenBytes // 存在某个ByteBuf发送部分数据的情况,调整它的readerIndex,下次继续发送。 if (writtenBytes != 0) { buf.readerIndex(readerIndex (int) writtenBytes); progress(writtenBytes); } break; } } // 重置 NIO_BUFFERS clearNioBuffers(); }

至此,Netty的数据发送核心流程全部分析结束。

总结

为了避免每次write都将数据写入TCP缓冲区,Netty的Channel提供了两种操作:write和flush,这需要依赖一个核心类ChannelOutboundBuffer。write只是将数据暂存到缓冲区,flush才是发送数据。同时为了避免消息积压的太多导致OOM,Netty提供了高低水位线,当暂存的消息到达高水位时,Netty会将Channel设为「不可写」,同时触发回调,用户可以根据该状态判断是否需要继续写入消息。

ChannelOutboundBuffer本身是个单向链表,负责管理暂存的消息,当需要发送数据时,它还会负责将ByteBuf转换成ByteBuffer,因为JDK底层的SocketChannel只支持写入ByteBuffer。

数据发送完毕后,ChannelOutboundBuffer还要负责根据实际发送的字节数来移除Entry节点,因为存在某个ByteBuf只发送了部分数据的情况,针对这种特殊情况,ChannelOutboundBuffer不会将节点移除,而是调整它的readerIndex索引,下次继续发送剩余数据。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页