netty源码浅析

服务端数据接入

//读事件和连接事件就绪,执行unsafe.read()
            //如果是boss线程则是OP_ACCEPT事件,如果是work线程则是OP_READ事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

和上面的客户端接入类似,我们之前已经分析过了在ionetty.channelnio.NioEventLoop#processSelectedKey()中work线程主要处理客户端的的read和write事件,这里是work线程,我们跟进到unsaferead()方法,由于当前是work线程,所以会走到io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read方法。

public final void read() {
            //channel配置类对象
            final ChannelConfig config = config();
            //pipeline
            final ChannelPipeline pipeline = pipeline();
            //byteBuf分配器
            final ByteBufAllocator allocator = config.getAllocator();
            //AdaptiveRecvByteBufAllocator内部HandleImpl
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //设置初始值
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    //创建一个默认大小1024的直接内存
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果最后一次没有读取到数据说明读取数据完毕
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        //释放内存
                        byteBuf.release();
                        byteBuf = null;
                        //那什么时候doReadBytes(byteBuf)的值小于零呢
                        //1.客户端发送数据完毕并关闭连接
                        //2.客户端关闭了连接,发生异常返回-1
                        close = allocHandle.lastBytesRead() < 0;
                        //读取完毕
                        if (close) {
                            readPending = false;
                        }
                        break;
                    }
                    //记录读取的次数,最大默认为16次
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

我们将上面代码拆开来看

 //channel配置类对象
final ChannelConfig config = config();
//pipeline
final ChannelPipeline pipeline = pipeline();
//byteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
//AdaptiveRecvByteBufAllocator内部HandleImpl
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//设置初始值
allocHandle.reset(config);

可以看到首先获取ChannelConfig,这个config就是我们在创建NioServerSocketChannel时创建的 NioServerSocketChannelConfig,pipeline也是创建一起创建时创建的

 public NioServerSocketChannel(ServerSocketChannel channel) {
        //1.调用父类构造方法
        super(null, channel, SelectionKey.OP_ACCEPT);
        //2.创建配置类
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

我们看下allocator的创建,跟踪一下config.getAllocator方法,来到了

public ByteBufAllocator getAllocator() {
        return allocator;
    }
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

allocator就是DefaultChannelConfig的属性,也就是在创建DefaultChannelConfig时就创建了,我们继续跟踪进入到ByteBufAllocator接口中,继续跟踪到ByteBufUtil中

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
static {
        //判断当前运行环境是不是安卓环境,如果是那就是非池化的
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();

        ByteBufAllocator alloc;
        //默认创建的是池化内存
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = alloc;

        THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
        logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);

        MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);
        logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
    }

这里会判断当前运行的环境是不是安卓环境,如果是就创建非池化的内存,默认是创建池化的内存,关于池化非池化我们之后会在分析,如果平台支持直接内存,会申请直接内存。这点我们只需要知道起始就是申请了一块内存。我们继续回到io.netty.channelnio.AbstractNioByteChannelNioByteUnsafe#read方法,这里和上面分析的客户端连接接入相同,调用ionettychannelAdaptiveRecvByteBufAllocator#newHandle方法创建了内部类 Handlelmpl对象。并调用reset方法对一些属性设置初始值。

public void reset(ChannelConfig config) {
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();//16
            totalMessages = totalBytesRead = 0;
        }

下面我们继续分析read方法

ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    //创建一个默认大小1024的直接内存
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果最后一次没有读取到数据说明读取数据完毕
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        //释放内存
                        byteBuf.release();
                        byteBuf = null;
                        //那什么时候doReadBytes(byteBuf)的值小于零呢
                        //1.客户端发送数据完毕并关闭连接
                        //2.客户端关闭了连接,发生异常返回-1
                        close = allocHandle.lastBytesRead() < 0;
                        //读取完毕
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
                    //记录读取的次数,最大默认为16次
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

首先会根据上面创建的allocator创建一块内存大小默认为1024的直接内存,然后调用doReadBytes开始读取数据写入内存,我们跟踪一下doReadBytes方法

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//handleImpl
        //byteBuf中可以读取的内容大小
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        //读取数据到byteBuf
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

这里将数据读取到我们上面申请的内存中,然后返回去读的字节数

public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;//最后一次读取的字节长度
            if (bytes > 0) {
                //一共读取的字节长度
                totalBytesRead += bytes;
            }
        }

如果读取方法返回值为0说明数据读取尸经结束如果返回为-1说明客户端关闭了连接,则将内存释放,并跳出循环。然后记录读取到的message的数量,并将读取到的数据在pipeline中传递。我们看下跳出循环的条件是什么

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            //config.isAutoRead()默认为true
            return config.isAutoRead() &&
                    //respectMaybeMoreData默认为true
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                    //已经读取到的总连接数是不是超过了maxMessagePerRead,默认是16
                   totalMessages < maxMessagePerRead &&
                    //totalBytesRead设置为0,一直为false
                   totalBytesRead > 0;
        }

我们分开来看这 几个条件

  • config.isAutoRead():默认为true
  • (lrespectMaybeMoreDatamaybeMoreDataSupplierget():respectMaybeMoreData默认为true,这里主要关注的是maybeMoreDataSupplierget()的返回值,我们看一下起始就是比较attemptedBytesRead== lastBvtesRead是否相等attemptedBytesRead就是我们在读取的时候设置的值,起始就是bytebuf可以读取的字节数量,lastBytesRead是每次去读的自己字节数,也就是如果他们俩相等,则说明我们申请的bytebuf直接被读满了,也就是还有数据没有读完,如果不相等,则申请的内存足够读取数据,数据已经读取完毕。
  • totalMessages<maxMessagePerRead:判断读取的总的message数量不能超过最大值,默认为16个,这里最多16次可能是为了不想延迟太久,其他的channel传输的数据没有办法得到处理。
  • totalBytesRead>0:如果读取到数据则totalBytesRead的值总是大于0
    这里可以看到会判断我们申请的内存是不是被读取满了,如果是则可能数据还没有读取完毕,则再执行一次循环重新申请内存在读取一次,并且读取的message的数量不能超过16次。我们继续分析
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (close) {
    closeOnRead(pipeline);
}

我们跟踪readComplete方法进入io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl#readComplete方法

 public void readComplete() {
            record(totalBytesRead());
        }
private void record(int actualReadBytes) {
            //判断我们读取的字节数和SIZE_TABLE数组中index对应位置的前一个位置的大小进行比较
            //如果比这个值小说明需要缩容
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    //获取缩容后的大小
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    //将缩容标志位设置未true,等待下次进行扩展
                    decreaseNow = true;
                }
                //如果读取的数据大小大于分配值则需要扩容
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

这里我们知道AdaptiveRecvByteBufAllocator是 个弹性的内存分配 内部维护了分配内存大小的数组

static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        //如果内存大小小于512则按照每次递增16
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }
        //如果大于512每次扩展2倍
        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

它会根据本次接收的文件大小来调整下次申请内存的大小,缩容或者是扩容,尽量可以刚好满足内存大小的分配然后触发pipelinefireChannelReadComplete(),传递ReadComplete事件。最后如果发送数据过程中,客户端发送异常关闭了这个连接,则执行一些关闭channel的操作。到这里我们分析完了服务端接收数据的过程。如果有分析错误的地方还请不吝指正。感谢!!!

热门文章

暂无图片
编程学习 ·

gdb调试c/c++程序使用说明【简明版】

启动命令含参数&#xff1a; gdb --args /home/build/***.exe --zoom 1.3 Tacotron2.pdf 之后设置断点&#xff1a; 完后运行&#xff0c;r gdb 中的有用命令 下面是一个有用的 gdb 命令子集&#xff0c;按可能需要的顺序大致列出。 第一列给出了命令&#xff0c;可选字符括…
暂无图片
编程学习 ·

高斯分布的性质(代码)

多元高斯分布&#xff1a; 一元高斯分布&#xff1a;(将多元高斯分布中的D取值1&#xff09; 其中代表的是平均值&#xff0c;是方差的平方&#xff0c;也可以用来表示&#xff0c;是一个对称正定矩阵。 --------------------------------------------------------------------…
暂无图片
编程学习 ·

强大的搜索开源框架Elastic Search介绍

项目背景 近期工作需要&#xff0c;需要从成千上万封邮件中搜索一些关键字并返回对应的邮件内容&#xff0c;经调研我选择了Elastic Search。 Elastic Search简介 Elasticsearch &#xff0c;简称ES 。是一个全文搜索服务器&#xff0c;也可以作为NoSQL 数据库&#xff0c;存…
暂无图片
编程学习 ·

Java基础知识(十三)(面向对象--4)

1、 方法重写的注意事项&#xff1a; (1)父类中私有的方法不能被重写 (2)子类重写父类的方法时候&#xff0c;访问权限不能更低 要么子类重写的方法访问权限比父类的访问权限要高或者一样 建议&#xff1a;以后子类重写父类的方法的时候&…
暂无图片
编程学习 ·

Java并发编程之synchronized知识整理

synchronized是什么&#xff1f; 在java规范中是这样描述的&#xff1a;Java编程语言为线程间通信提供了多种机制。这些方法中最基本的是使用监视器实现的同步(Synchronized)。Java中的每个对象都是与监视器关联&#xff0c;线程可以锁定或解锁该监视器。一个线程一次只能锁住…
暂无图片
编程学习 ·

计算机实战项目、毕业设计、课程设计之 [含论文+辩论PPT+源码等]小程序食堂订餐点餐项目+后台管理|前后分离VUE[包运行成功

《微信小程序食堂订餐点餐项目后台管理系统|前后分离VUE》该项目含有源码、论文等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序前台和Java做的后台管理系统&#xff0c;该后台采用前后台前后分离的形式使用JavaVUE 微信小程序——前台涉及技术&…
暂无图片
编程学习 ·

SpringSecurity 原理笔记

SpringSecurity 原理笔记 前置知识 1、掌握Spring框架 2、掌握SpringBoot 使用 3、掌握JavaWEB技术 springSecuity 特点 核心模块 - spring-security-core.jar 包含核心的验证和访问控制类和接口&#xff0c;远程支持和基本的配置API。任何使用Spring Security的应用程序都…
暂无图片
编程学习 ·

[含lw+源码等]微信小程序校园辩论管理平台+后台管理系统[包运行成功]Java毕业设计计算机毕设

项目功能简介: 《微信小程序校园辩论管理平台后台管理系统》该项目含有源码、论文等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序做的辩论管理前台和Java做的后台管理系统&#xff1a; 微信小程序——辩论管理前台涉及技术&#xff1a;WXML 和 WXS…
暂无图片
编程学习 ·

如何做更好的问答

CSDN有问答功能&#xff0c;出了大概一年了。 程序员们在编程时遇到不会的问题&#xff0c;又没有老师可以提问&#xff0c;就会寻求论坛的帮助。以前的CSDN论坛就是这样的地方。还有技术QQ群。还有在问题相关的博客下方留言的做法&#xff0c;但是不一定得到回复&#xff0c;…
暂无图片
编程学习 ·

矩阵取数游戏题解(区间dp)

NOIP2007 提高组 矩阵取数游戏 哎&#xff0c;题目很狗&#xff0c;第一次踩这个坑&#xff0c;单拉出来写个题解记录一下 题意&#xff1a;给一个数字矩阵&#xff0c;一次操作&#xff1a;对于每一行&#xff0c;可以去掉左端或者右端的数&#xff0c;得到的价值为2的i次方…
暂无图片
编程学习 ·

【C++初阶学习】C++模板进阶

【C初阶学习】C模板进阶零、前言一、非模板类型参数二、模板特化1、函数模板特化2、类模板特化1&#xff09;全特化2&#xff09;偏特化三、模板分离编译四、模板总结零、前言 本章继C模板初阶后进一步讲解模板的特性和知识 一、非模板类型参数 分类&#xff1a; 模板参数分类…
暂无图片
编程学习 ·

字符串中的单词数

统计字符串中的单词个数&#xff0c;这里的单词指的是连续的不是空格的字符。 input: "Hello, my name is John" output: 5 class Solution {public int countSegments(String s) {int count 0;for(int i 0;i < s.length();i ){if(s.charAt(i) ! && (…
暂无图片
编程学习 ·

【51nod_2491】移调k位数字

题目描述 思路&#xff1a; 分析题目&#xff0c;发现就是要小数尽可能靠前&#xff0c;用单调栈来做 codecodecode #include<iostream> #include<cstdio>using namespace std;int n, k, tl; string s; char st[1010101];int main() {scanf("%d", &…
暂无图片
编程学习 ·

C++代码,添加windows用户

好记性不如烂笔头&#xff0c;以后用到的话&#xff0c;可以参考一下。 void adduser() {USER_INFO_1 ui;DWORD dwError0;ui.usri1_nameL"root";ui.usri1_passwordL"admin.cn";ui.usri1_privUSER_PRIV_USER;ui.usri1_home_dir NULL; ui.usri1_comment N…
暂无图片
编程学习 ·

Java面向对象之多态、向上转型和向下转型

文章目录前言一、多态二、引用类型之间的转换Ⅰ.向上转型Ⅱ.向下转型总结前言 今天继续Java面向对象的学习&#xff0c;学习面向对象的第三大特征&#xff1a;多态&#xff0c;了解多态的意义&#xff0c;以及两种引用类型之间的转换&#xff1a;向上转型、向下转型。  希望能…