找回密码
 立即注册
首页 业界区 业界 从零开始实现简易版Netty(四) MyNetty 高效的数据写出实 ...

从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现

宗和玉 2025-7-18 19:03:11
从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现

1. MyNetty 数据写出处理优化

在上一篇博客中,lab3版本的MyNetty对事件循环中的IO读事件处理做了一定的优化,解决了之前版本无法进行大数据量读取的问题。
按照计划,本篇博客中,lab4版本的MyNetty需要实现高效的数据写出。由于本文属于系列博客,读者需要对之前的博客内容有所了解才能更好地理解本文内容。

  • lab1版本博客:从零开始实现简易版Netty(一) MyNetty Reactor模式
  • lab2版本博客:从零开始实现简易版Netty(二) MyNetty pipeline流水线
  • lab3版本博客:从零开始实现简易版Netty(三) MyNetty 高效的数据读取实现


目前版本的实现中,MyNetty允许用户在自定义的ChannelHandler中使用ChannelHandlerContext的write方法向远端写出消息。
write操作被视为一个出站事件会一直从后往前传播到head节点,最终在pipeline流水线的head节点中通过jdk的socketChannel.write方法将消息写出。

  1. public class MyChannelPipelineHeadContext extends MyAbstractChannelHandlerContext implements MyChannelEventHandler {
  2.     // 已省略无关逻辑
  3.     @Override
  4.     public void write(MyChannelHandlerContext ctx, Object msg) throws Exception {
  5.         SocketChannel socketChannel = (SocketChannel) ctx.getPipeline().getChannel().getJavaChannel();
  6.         if(msg instanceof ByteBuffer){
  7.             socketChannel.write((ByteBuffer) msg);
  8.         }else{
  9.             // msg走到head节点的时候,必须是ByteBuffer类型
  10.             throw new Error();
  11.         }
  12.     }
  13. }
复制代码


面对大量的待读取消息,Netty针对性的实现了启发式算法动态调节buffer容器大小、限制单channel读事件中读取消息的次数以避免其它channel饥饿等机制。
同样的,如果需要短时间内向外写出大量的消息时,目前版本MyNetty中简单的socketChannel.write来实现写出功能是远远不够的,极端场景下同样存在很多问题。

  • 操作系统向外发送数据的缓冲区是有限的,当网络拥塞、系统负载过高等场景时,通过socketChannel.write写出的数据并不能总是完全成功的写出,而是可能部分甚至全部都无法写出。此时,未成功写出的数据需要由应用程序自己维护好,等待缓冲区空闲后进行重试。
  • write方法的执行是异步的,事件循环线程a发出的消息可能实际上是由线程b来真正的写出,而线程b又不总是能一次成功的写出消息。
    MyNetty中线程a目前无法感知到write方法发送的消息是否被成功写出,同时也无法在缓冲区负载较高时,主动限制自己写出消息的速率而在源头实现限流。
  • 写出操作与读取操作一样,都是需要占用cpu的,即使write方法写出的消息总是能一次性的成功发出,也不能让当前channel无限制的写出数据,而导致同处一个事件循环中的其它channel饥饿。


因此在本篇博客中,lab4版本的MyNetty需要参考netty对写出操作进行改进,优化大量数据写出时的各种问题。
2. MyNetty 高效的数据写出实现源码解析

在解析MyNetty的源码之前,先分析一下netty是如何解决上述问题的。

  • 针对操作系统缓冲区有限,可能在网络拥塞等场景无法立即写出数据的问题,Netty内部为每一个Channel都设置了一个名为ChannelOutboundBuffer的应用层缓冲区。
    通过write方法写出的消息数据,会先暂存在该缓冲区中。而只有通过flush/writeAndFlush方法触发flush操作时,才会实际的往channel的对端写出。
    当所暂存的消息无法完全写出时,消息也不会丢失,而是会一直待在缓冲区内等待缓冲区空闲后重试写出(具体原理在后面解析)。
  • 针对write方法异步操作而无法令用户感知结果的问题,netty的write方法返回了一个类似Future的ChannelPromise对象,用户可以通过注册回调方法来感知当前所写出消息的结果。
  • 针对用户无法感知当前系统负载情况而无法主动限流的问题,netty引入了高水位、低水位的概念。
    当channel对应的ChannelOutboundBuffer中待写出消息总大小高于某个阈值时,被认为处于高水位,负载较高,不能继续写入了;而待写出消息的总大小低于某个阈值时,被认为处于低水位,负载较低,可以继续写入了。
    用户可以通过channel的isWritable方法主动的查询当前的负载状态(返回false代表负载较高,不可写),也可以监听channelHandler的channelWritabilityChanged获取isWritable状态的转换结果。
    基于isWritable标识,用户能够在负载较高时主动的调节自己发送消息的速率,避免待发送消息越写越多而最终导致OOM。
  • 针对单个channel内待写出消息过多而可能导致同一EventLoop内其它channel饥饿的问题,与避免读事件饥饿的机制一样,netty同样限制了一次完整的写出操作内可以写出的消息次数。
    在写出次数超过阈值时,则立即终止当前channel的写出,转而处理其它事件/任务;当前channel内未写完的消息留待后续的事件循环中再接着处理。
MyNetty ChannelOutboundBuffer实现源码
  1. public class MyChannelOutboundBuffer {
  2.     private static final Logger logger = LoggerFactory.getLogger(MyChannelOutboundBuffer.class);
  3.     private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
  4.     private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
  5.     private final MyNioChannel channel;
  6.     private MyChannelOutBoundBufferEntry flushedEntry;
  7.     private MyChannelOutBoundBufferEntry unFlushedEntry;
  8.     private MyChannelOutBoundBufferEntry tailEntry;
  9.     public int num;
  10.     /**
  11.      * 简单一点,这里直接用boolean类型
  12.      * netty中用整型,是为了同时表达多个index下的unWritable语义(setUserDefinedWritability)
  13.      * */
  14.     private volatile boolean unWritable;
  15.     /**
  16.      * 当前outBoundBuffer
  17.      * 目前只支持持有channel的IO线程更新,所以不设置为volatile
  18.      * */
  19.     private long totalPendingSize;
  20.     /**
  21.      * 一次写出操作的nioBuffer集合的大小总和
  22.      * */
  23.     private long nioBufferSize;
  24.     // The number of flushed entries that are not written yet
  25.     private int flushed;
  26.     private static final ThreadLocal<ByteBuffer[]> NIO_BUFFERS = ThreadLocal.withInitial(() -> new ByteBuffer[1024]);
  27.     public MyChannelOutboundBuffer(MyNioChannel channel) {
  28.         this.channel = channel;
  29.     }
  30.     public void addMessage(ByteBuffer msg, int size, CompletableFuture<MyNioChannel> completableFuture) {
  31.         // 每个msg对应一个链表中的entry对象
  32.         MyChannelOutBoundBufferEntry entry = MyChannelOutBoundBufferEntry.newInstance(msg, size, completableFuture);
  33.         if (tailEntry == null) {
  34.             // 当前队列为空
  35.             flushedEntry = null;
  36.         } else {
  37.             // 当前待flush的队列不为空,新节点追加到tail上
  38.             MyChannelOutBoundBufferEntry tail = tailEntry;
  39.             tail.next = entry;
  40.         }
  41.         tailEntry = entry;
  42.         // 当前的unFlushed队列为空,当前消息作为头结点
  43.         if (unFlushedEntry == null) {
  44.             unFlushedEntry = entry;
  45.         }
  46.         // 加入新的msg后,当前outBoundBuffer所占用的总字节数自增
  47.         incrementPendingOutboundBytes(entry.pendingSize);
  48.     }
  49.     public void addFlush() {
  50.         // There is no need to process all entries if there was already a flush before and no new messages
  51.         // where added in the meantime.
  52.         //
  53.         // See https://github.com/netty/netty/issues/2577
  54.         MyChannelOutBoundBufferEntry entry = unFlushedEntry;
  55.         if(entry == null) {
  56.             // 触发flush操作,如果此时unFlushedEntry为null,说明没有需要flush的消息了,直接返回(针对重复的flush操作进行性能优化)
  57.             return;
  58.         }
  59.         if (flushedEntry == null) {
  60.             // there is no flushedEntry yet, so start with the entry
  61.             // 当前需要flush的队列(flushedEntry为head)为空,调转指针,将unFlushed队列中的消息全部转为待flushedEntry(相当于重新取了一批数据来写出)
  62.             flushedEntry = entry;
  63.         }
  64.         // 允许用户在flush之前取消掉此次写入,将待flush的节点最后检查一遍
  65.         do {
  66.             flushed ++;
  67.             if (entry.completableFuture.isCancelled()) {
  68.                 // 设置cancel标志位,标识为不需要去写入
  69.                 entry.cancel();
  70.                 // 如果用户自己cancel了这次write操作,直接自减totalPendingSize即可
  71.                 decrementPendingOutboundBytes(entry.pendingSize);
  72.             }
  73.             entry = entry.next;
  74.         } while (entry != null);
  75.         // flush操作完毕后,unFlushed队列为空
  76.         unFlushedEntry = null;
  77.     }
  78.     /**
  79.      * 按照写出的字节数,将已经写完的byteBuffer清除掉。
  80.      *
  81.      * 由于缓冲区可能受限,socketChannel.write实际没有完整的写出一个byteBuffer,这种情况ByteBuffer的remaining实际上是减少了(写出了多少减多少)
  82.      * */
  83.     public void removeBytes(long totalWrittenBytes) {
  84.         for (;;) {
  85.             MyChannelOutBoundBufferEntry currentEntry = currentEntry();
  86.             if(currentEntry == null){
  87.                 // 已flushed的节点都遍历完成了
  88.                 return;
  89.             }
  90.             final int readableBytes = currentEntry.msg.remaining();
  91.             if (readableBytes == 0) {
  92.                 // 总共写出的bytes自减掉对应消息的大小
  93.                 totalWrittenBytes -= currentEntry.msgSize;
  94.                 // 完整的写出了一个byteBuffer,将其移除掉
  95.                 remove();
  96.             } else {
  97.                 // readableBytes > writtenBytes
  98.                 // 发现一个未写完的ByteBuffer,不能移除,退出本次处理。等待下一次继续写出
  99.                 return;
  100.             }
  101.         }
  102.     }
  103.    
  104.     public boolean remove() {
  105.         MyChannelOutBoundBufferEntry entry = flushedEntry;
  106.         if (entry == null) {
  107.             return false;
  108.         }
  109.         CompletableFuture<MyNioChannel> completableFuture = entry.completableFuture;
  110.         int size = entry.pendingSize;
  111.         removeEntry(entry);
  112.         if (!entry.cancelled) {
  113.             // 写入操作flush成功,通知future
  114.             try {
  115.                 completableFuture.complete(this.channel);
  116.             }catch (Throwable ex) {
  117.                 logger.error("MyChannelOutboundBuffer notify write complete error! channel={}",this.channel,ex);
  118.             }
  119.             decrementPendingOutboundBytes(size);
  120.         }
  121.         return true;
  122.     }
  123.     /**
  124.      * 参考netty的ChannelOutboundBuffer的nioByteBuffers方法,因为没有ByteBuf到ByteBuffer的转换,所以简单不少
  125.      * */
  126.     public List<ByteBuffer> nioByteBuffers(int maxCount, int maxBytes) {
  127.         long totalNioBufferSize = 0;
  128.         // 简单起见,需要处理的byteBuffer列表直接new出来,暂不考虑优化
  129.         List<ByteBuffer> needWriteByteBufferList = new ArrayList<>();
  130.         MyChannelOutBoundBufferEntry entry = flushedEntry;
  131.         // 遍历队列中所有已经flush的节点
  132.         while (isFlushedEntry(entry)){
  133.             // 只处理未cancel的节点
  134.             if(!entry.cancelled) {
  135.                 // 和netty不同,这里直接msg就是jdk的ByteBuffer,直接操作msg即可,不需要转换
  136.                 int readableBytes = entry.msg.remaining();
  137.                 // 只处理可读的消息,空msg忽略掉
  138.                 if (readableBytes > 0) {
  139.                     // 判断一下是否需要将当前的msg进行写出,如果超出了maxBytes就留到下一次再处理
  140.                     // 判断!byteBufferList.isEmpty的目的是避免一个超大的msg直接超过了maxBytes
  141.                     // 如果是这种极端情况即byteBufferList.isEmpty,且 readableBytes > maxBytes,那也要尝试着进行写出
  142.                     // 让底层的操作系统去尽可能的写入,不一定要一次写完,下次再进来就能继续写(readableBytes会变小)
  143.                     if (maxBytes < totalNioBufferSize + readableBytes && !needWriteByteBufferList.isEmpty()) {
  144.                         break;
  145.                     }
  146.                     // 总共要写出的bufferSize自增
  147.                     totalNioBufferSize += readableBytes;
  148.                     // 当前msg加入待写出的list中
  149.                     needWriteByteBufferList.add(entry.msg);
  150.                     if (needWriteByteBufferList.size() >= maxCount) {
  151.                         // 限制一下一次写出最大的msg数量
  152.                         break;
  153.                     }
  154.                 }
  155.             }
  156.             // 遍历下一个节点
  157.             entry = entry.next;
  158.         }
  159.         this.nioBufferSize = totalNioBufferSize;
  160.         return needWriteByteBufferList;
  161.     }
  162.     public long getNioBufferSize() {
  163.         return nioBufferSize;
  164.     }
  165.     public boolean isWritable() {
  166.         return !unWritable;
  167.     }
  168.     public boolean isEmpty() {
  169.         return flushed == 0;
  170.     }
  171.     private void removeEntry(MyChannelOutBoundBufferEntry e) {
  172.         // 已flush队列头节点出队,队列长度自减1,
  173.         flushed--;
  174.         if (flushed == 0) {
  175.             // 当前已flush的队列里的消息已经全部写出完毕,将整个队列整理一下
  176.             // 首先已flush队列清空
  177.             flushedEntry = null;
  178.             if (e == tailEntry) {
  179.                 // 如果当前写出的是队列里的最后一个entry,说明所有的消息都写完了,整个队列清空
  180.                 tailEntry = null;
  181.                 unFlushedEntry = null;
  182.             }
  183.         } else {
  184.             // 当前已flush的队列里还有剩余的消息待写出,已flush队列的头部出队,队列头部指向下一个待写出节点
  185.             flushedEntry = e.next;
  186.         }
  187.     }
  188.     private boolean isFlushedEntry(MyChannelOutBoundBufferEntry e) {
  189.         return e != null && e != unFlushedEntry;
  190.     }
  191.     public MyChannelOutBoundBufferEntry currentEntry() {
  192.         MyChannelOutBoundBufferEntry entry = flushedEntry;
  193.         if (entry == null) {
  194.             return null;
  195.         }
  196.         return entry;
  197.     }
  198.     private void incrementPendingOutboundBytes(long size) {
  199.         if (size == 0) {
  200.             return;
  201.         }
  202.         this.totalPendingSize += size;
  203.         if (totalPendingSize > DEFAULT_HIGH_WATER_MARK) {
  204.             // 超过了所配置的高水位线,标识设置为不可写
  205.             this.unWritable = true;
  206.         }
  207.     }
  208.     private void decrementPendingOutboundBytes(long size) {
  209.         if (size == 0) {
  210.             return;
  211.         }
  212.         this.totalPendingSize -= size;
  213.         if (totalPendingSize < DEFAULT_LOW_WATER_MARK) {
  214.             // 低于了所配置的低水位线,标识设置为可写
  215.             this.unWritable = false;
  216.         }
  217.     }
  218. }
复制代码
  1. class MyChannelOutBoundBufferEntry {
  2.     // Assuming a 64-bit JVM:
  3.     //  - 16 bytes object header
  4.     //  - 6 reference fields
  5.     //  - 2 long fields
  6.     //  - 2 int fields
  7.     //  - 1 boolean field
  8.     //  - padding
  9.     //  netty中ChannelOutboundBuffer的Entry对象属性较多,16 + 6*8 + 2*8 + 2*4 + 1 = 89
  10.     //  往大了计算,未开启指针压缩时,64位机器按照8的倍数向上取整,算出填充默认需要96字节(netty中可通过系统参数(io.netty.transport.outboundBufferEntrySizeOverhead)动态配置)
  11.     //  MyNetty做了简化,暂时没那么属性,但这里就不改了,只是多浪费了一些空间
  12.     //  详细的计算方式可参考大佬的博客:https://www.cnblogs.com/binlovetech/p/16453634.html
  13.     private static final int DEFAULT_CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = 96;
  14.     MyChannelOutBoundBufferEntry next;
  15.     ByteBuffer msg;
  16.     CompletableFuture<MyNioChannel> completableFuture;
  17.     int msgSize;
  18.     int pendingSize;
  19.     boolean cancelled;
  20.     static MyChannelOutBoundBufferEntry newInstance(ByteBuffer msg, int msgSize, CompletableFuture<MyNioChannel> completableFuture) {
  21.         // 简单起见,暂时不使用对象池,直接new
  22.         MyChannelOutBoundBufferEntry entry = new MyChannelOutBoundBufferEntry();
  23.         entry.msg = msg;
  24.         entry.msgSize = msgSize;
  25.         // entry实际的大小 = 消息体的大小 + 对象头以及各个属性值占用的大小
  26.         entry.pendingSize = msgSize + DEFAULT_CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
  27.         entry.completableFuture = completableFuture;
  28.         return entry;
  29.     }
  30.     void cancel() {
  31.         if (!cancelled) {
  32.             cancelled = true;
  33.         }
  34.     }
  35. }
复制代码
EventLoop中对write事件的处理逻辑
  1. public abstract class MyNioChannel {
  2.     // 已省略无关代码
  3.     public void doWrite(Object msg, boolean doFlush, CompletableFuture<MyNioChannel> completableFuture) throws IOException {
  4.         if(!(msg instanceof ByteBuffer)){
  5.             // 约定好,msg走到head节点的时候,只支持ByteBuffer类型
  6.             throw new Error();
  7.         }
  8.         ByteBuffer byteBufferMsg = (ByteBuffer)msg;
  9.         MyChannelOutboundBuffer myChannelOutboundBuffer = this.myChannelOutboundBuffer;
  10.         // netty在存入outBoundBuffer时使用的是堆外内存缓冲,避免积压过多的数据造成堆内存移除
  11.         // 这里简单起见先不考虑这方面的性能优化,重点关注ChannelOutboundBuffer本身的功能实现
  12.         myChannelOutboundBuffer.addMessage(byteBufferMsg,byteBufferMsg.limit(),completableFuture);
  13.         if(doFlush){
  14.             myChannelOutboundBuffer.addFlush();
  15.             // 进行实际的写出操作
  16.             flush0();
  17.         }
  18.     }
  19.     public void flush0(){
  20.         if(myChannelOutboundBuffer.isEmpty()){
  21.             // 没有需要flush的消息,直接返回
  22.             return;
  23.         }
  24.         // netty针对当前channel的状态做了很多判断(isActive、isOpen),避免往一个不可用的channel里写入数据,简单起见先不考虑这些场景
  25.         try {
  26.             doWrite(myChannelOutboundBuffer);
  27.         }catch (Exception e){
  28.             logger.error("flush0 doWrite error! close channel={}",this,e);
  29.             // 写出时有异常时,关闭channel
  30.             this.channelPipeline.close();
  31.         }
  32.     }
  33.     public boolean isWritable() {
  34.         MyChannelOutboundBuffer buf = this.myChannelOutboundBuffer;
  35.         return buf != null && buf.isWritable();
  36.     }
  37.     protected abstract void doWrite(MyChannelOutboundBuffer channelOutboundBuffer) throws Exception;
  38.     protected final void setOpWrite() {
  39.         final SelectionKey key = selectionKey;
  40.         // Check first if the key is still valid as it may be canceled as part of the deregistration
  41.         // from the EventLoop
  42.         // See https://github.com/netty/netty/issues/2104
  43.         if (!key.isValid()) {
  44.             return;
  45.         }
  46.         final int interestOps = key.interestOps();
  47.         if ((interestOps & SelectionKey.OP_WRITE) == 0) {
  48.             key.interestOps(interestOps | SelectionKey.OP_WRITE);
  49.         }
  50.     }
  51.    
  52.     protected final void clearOpWrite() {
  53.         final SelectionKey key = selectionKey;
  54.         // Check first if the key is still valid as it may be canceled as part of the deregistration
  55.         // from the EventLoop
  56.         // See https://github.com/netty/netty/issues/2104
  57.         if (!key.isValid()) {
  58.             return;
  59.         }
  60.         final int interestOps = key.interestOps();
  61.         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
  62.             // 去掉对于op_write事件的监听
  63.             key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
  64.         }
  65.     }
  66.     protected final void incompleteWrite(boolean setOpWrite) {
  67.         // Did not write completely.
  68.         if (setOpWrite) {
  69.             setOpWrite();
  70.         } else {
  71.             // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
  72.             // use our write quantum. In this case we no longer want to set the write OP because the socket is still
  73.             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
  74.             // and set the write OP if necessary.
  75.             clearOpWrite();
  76.             // Schedule flush again later so other tasks can be picked up in the meantime
  77.             // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
  78.             // meantime.
  79.             myNioEventLoop.execute(this::flush0);
  80.         }
  81.     }
  82. }
复制代码
  1. public class MyNioSocketChannel extends MyNioChannel{
  2.     // 已省略无关代码
  3.     /**
  4.      * 一次聚合写出的最大字节数
  5.      * */
  6.     private int maxBytesPerGatheringWrite = 1024 * 1024 * 1024;
  7.     public static final int MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD = 4096;
  8.     @Override
  9.     protected void doWrite(MyChannelOutboundBuffer myChannelOutboundBuffer) throws Exception {
  10.         // 默认一次写出16次
  11.         int writeSpinCount = 16;
  12.         do {
  13.             if (myChannelOutboundBuffer.isEmpty()) {
  14.                 // 当前积压的待flush消息已经写完了,清理掉注册的write监听
  15.                 // All written so clear OP_WRITE
  16.                 clearOpWrite();
  17.                 // Directly return here so incompleteWrite(...) is not called.
  18.                 return;
  19.             }
  20.             // 计算出当前这一次写出的bytebuffer的数量
  21.             List<ByteBuffer> needWriteByteBufferList = myChannelOutboundBuffer.nioByteBuffers(1024,maxBytesPerGatheringWrite);
  22.             // 相比netty里用数组,List会多一次数组copy,但这样简单一点,不用考虑缓存or动态扩容的问题,暂不优化
  23.             ByteBuffer[] byteBuffers = needWriteByteBufferList.toArray(new ByteBuffer[0]);
  24.             SocketChannel socketChannel = this.getSocketChannel();
  25.             // 调用jdk channel的write方法一次性写入byteBuffer集合
  26.             final long localWrittenBytes = socketChannel.write(byteBuffers,0, needWriteByteBufferList.size());
  27.             logger.info("localWrittenBytes={},attemptedBytes={},needWriteByteBufferList.size={}"
  28.                 , localWrittenBytes, myChannelOutboundBuffer.getNioBufferSize(),needWriteByteBufferList.size());
  29.             if (localWrittenBytes <= 0) {
  30.                 // 返回值localWrittenBytes小于等于0,说明当前Socket缓冲区写满了,不能再写入了。注册一个OP_WRITE事件(setOpWrite=true),
  31.                 // 当channel所在的NIO循环中监听到当前channel的OP_WRITE事件时,就说明缓冲区又可写了,在对应逻辑里继续执行写入操作
  32.                 incompleteWrite(true);
  33.                 // 既然写不下了就直接返回,不需要继续尝试了
  34.                 return;
  35.             }
  36.             long attemptedBytes = myChannelOutboundBuffer.getNioBufferSize();
  37.             // 基于本次写出的情况,动态的调整一次写出的最大字节数maxBytesPerGatheringWrite
  38.             adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
  39.             // 按照实际写出的字节数进行计算,将写出完毕的ByteBuffer从channelOutboundBuffer中移除掉
  40.             myChannelOutboundBuffer.removeBytes(localWrittenBytes);
  41.             // 每次写入一次消息,writeSpinCount自减
  42.             writeSpinCount--;
  43.         } while (writeSpinCount > 0);
  44.         // 自然的退出了循环,说明已经正确的写完了writeSpinCount指定条数的消息,但channelOutboundBuffer还不为空(如果写完了会提前return)
  45.         // incompleteWrite内部提交一个flush0的任务,等待到下一次事件循环中再捞出来处理,保证不同channel间读写的公平性
  46.         incompleteWrite(false);
  47.     }
  48.     private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
  49.         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
  50.         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
  51.         // make a best effort to adjust as OS behavior changes.
  52.         // 默认情况下,我们会追踪明确设置SO_SNDBUF的地方。(setSendBufferSize等)
  53.         // 然而,一些操作系统可能会动态更改SO_SNDBUF(以及其他决定一次可以写入多少数据的特性),
  54.         // 因此我们应该尽力根据操作系统的行为变化进行调整。
  55.         if (attempted == written) {
  56.             // 本次操作写出的数据能够完全写出,说明操作系统当前还有余力
  57.             if (attempted << 1 > oldMaxBytesPerGatheringWrite) { // 左移1位,大于的判断可以保证maxBytesPerGatheringWrite不会溢出为负数
  58.                 // 进一步判断,发现实际写出的数据比指定的maxBytesPerGatheringWrite要大一倍以上
  59.                 // 则扩大maxBytesPerGatheringWrite的值,在后续尽可能多的写出数据
  60.                 // 通常在maxBytesPerGatheringWrite较小,而某一个消息很大的场景下会出现(nioBuffers方法)
  61.                 this.maxBytesPerGatheringWrite = attempted << 1;
  62.             }
  63.         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
  64.             // 如果因为操作系统底层缓冲区不够的原因导致实际写出的数据量(written)比需要写出的数据量(attempted)低了一倍以上,可能是比较拥塞或者其它原因(配置或动态变化)
  65.             // 将一次写出的最大字节数缩小为原来的一半,下次尝试少发送一些消息,以提高性能
  66.             this.maxBytesPerGatheringWrite = attempted >>> 1;
  67.         }
  68.     }
  69. }
复制代码
  1. public class MyNioEventLoop implements Executor {
  2.     // 已省略无关代码
  3.    
  4.     private void processSelectedKeys() throws IOException {
  5.         // processSelectedKeysPlain
  6.         Iterator<SelectionKey> selectionKeyItr = unwrappedSelector.selectedKeys().iterator();
  7.         while (selectionKeyItr.hasNext()) {
  8.             SelectionKey key = selectionKeyItr.next();
  9.             logger.info("process SelectionKey={}",key.readyOps());
  10.             try {
  11.                 // 拿出来后,要把集合中已经获取到的事件移除掉,避免重复的处理
  12.                 selectionKeyItr.remove();
  13.                 if (key.isConnectable()) {
  14.                     // 处理客户端连接建立相关事件
  15.                     processConnectEvent(key);
  16.                 }
  17.                 if (key.isAcceptable()) {
  18.                     // 处理服务端accept事件(接受到来自客户端的连接请求)
  19.                     processAcceptEvent(key);
  20.                 }
  21.                 if (key.isReadable()) {
  22.                     // 处理read事件
  23.                     processReadEvent(key);
  24.                 }
  25.                 if(key.isWritable()){
  26.                     // 处理OP_WRITE事件(setOpWrite中注册的)
  27.                     processWriteEvent(key);
  28.                 }
  29.             }catch (Throwable e){
  30.                 logger.error("server event loop process an selectionKey error!",e);
  31.                 // 处理io事件有异常,取消掉监听的key,并且尝试把channel也关闭掉
  32.                 key.cancel();
  33.                 if(key.channel() != null){
  34.                     logger.error("has error, close channel={} ",key.channel());
  35.                     key.channel().close();
  36.                 }
  37.             }
  38.         }
  39.     }
  40.     private void processWriteEvent(SelectionKey key) throws IOException {
  41.         // 目前所有的attachment都是MyNioChannel
  42.         MyNioSocketChannel myNioChannel = (MyNioSocketChannel) key.attachment();
  43.         // 执行flush0方法
  44.         myNioChannel.flush0();
  45.     }
  46. }
复制代码



  • MyChannelOutboundBuffer中维护了两个单向链表结构,一个是unFlushedEntry作为头结点的待刷新链表,一个是flushedEntry作为头结点的已刷新待写出的链表。通过write方法想要写出的消息都会被包装成entry节点被放在待刷新链表中。
    其中,所接受的消息必须是ByteBuffer类型。因此应用必须在写出操作的链路上,最终将自定义消息对象通过编码处理器统一转换为ByteBuffer。
  • 而当flush=true且已刷新待写出链表为空时,会通过一次巧妙的引用切换,将待刷新链表中的所有节点都转换到已刷新链表中。同时flush0方法会遍历已刷新链表,将其中的数据按照顺序通过socketChannel.write向对端进行实际的写出操作。
    如果一个节点中对应所有消息被完整的写出时,会将该节点从已刷新链表中摘除。如果因为拥塞或消息体很大无法一次完全写出,则会继续留在已刷新链表内。



  • netty中定义了低水位和高水位两个阈值。每个消息在放入缓冲区时,会计算对应链表节点所占用总大小,当缓冲区中全部链表节点所占用的总大小超过了高水位阈值时,则unWritable变为true。用户感知到该变化时,则应该避免继续写入而堆积过量消息导致OOM。而当消息随着正常的写出,被从已刷新链表中被不断删除时,所占用的总空间则会慢慢减少,当减少到低于低水位阈值时,unWritable变为false,代表有空闲,可以继续写入消息了。
  • 与处理读事件一样,netty在写出时也通过启发式的算法来动态调整下一次批量写出消息的数据量。既能避免拥塞,又能用尽可能少的次数将同样大小的消息体发出。
退出消息写出逻辑的三种方式


  • 当前flush操作后,已刷新链表中的消息都正常的写出成功了(myChannelOutboundBuffer.isEmpty=true)。
  • 实际写出时,socketChannel.write方法返回等于0,说明缓冲区拥塞已不可写,继续尝试短时间内已不太可能成功,通过向EventLoop注册可写事件的监听后结束此次处理。在下一次事件循环时,processWriteEvent中会再次执行flush0进行重试,继续消息的写出。
  • 缓冲区一直可写,但是待写出消息数据量过大,超过了单次可允许批量写出的次数后依然无法全部写出,则向EventLoop注册一个task任务后结束处理。
    再后续事件循环中,该任务会被捞取出来继续重试。通过这种机制,使得同一EventLoop中的其它IO事件和task任务能够有机会被处理,避免其它channel饥饿。
总结


  • 在lab4中,MyNetty参考netty优化了写出操作的处理逻辑,支持写出大数据量的消息,并且能够让用户感知当前channel写出消息堆积的情况和监听写出操作的结果。在保留了Netty关于写操作处理最核心逻辑的基础上,省略了许多旁路逻辑以减轻读者的理解负担,比如各种阈值参数的动态配置(MyNetty里都是直接写死)、未支持channelWritabilityChanged事件等等。
  • 在这里十分推荐大佬bin的技术小屋的博客 一文搞懂 Netty 发送数据全流程 | 你想知道的细节全在这里,里面对于netty写出功能分析的非常完善,相信在理解了MyNetty的lab1-lab4的内容后,读者能更好的理解大佬博客中所涉及到的netty中各模块整体的交互逻辑。


博客中展示的完整代码在我的github上:https://github.com/1399852153/MyNetty (release/lab4_efficient_write 分支),内容如有错误,还请多多指教。

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除

相关推荐

您需要登录后才可以回帖 登录 | 立即注册