进程阻塞的原理

先贴上一个链接,对epoll讲的挺好https://www.cnblogs.com/Hijack-you/p/13057792.html 之前看过多次epoll的原理总是记不住,这次打算自己也手写一遍加强记忆 进程分为工作队列和等待队列,只有工作队列中的进程会参与cpu时间片的分配.当执行了recv函数后,进程进入等待队列中. 实际上进程是在socket的等待队列中. 并且每个socket都有端口号,这样当接收数据的时候就知道是哪个socket了 下面说下数据接收的流程 网卡接收到数据写入内存->发送中断信号到cpu->cpu执行中断程序,中断程序做了2个操作,1. 将接收到写入内存的数据复制到socket的接收缓冲区中 2. 将socket等待队列中的进程加入到工作队列中,同时从socket等待队列删除 另外中断程序是可以自定义的

epoll原理

epoll_ctl添加/删除要监听的socket对应的操作就是将eventpoll对象加入到socket的等待队列中 socket接收数据: 中断程序会给evepoll的rdlist添加socket的引用 阻塞进程: 当程序执行了epoll_wait后,将进程加入到eventpoll的等待队列中 唤醒进程: 当socket接收到数据时候一方面往rdlist添加socket引用,另一方面将进程从eventpoll的等待队列中移除并加入到工作队列中.

Netty流程

AbstractChannel { parent parent channel id channelId unsafe pipeline eventLoop } AbstractNioChannel { ch nio原生channel readIntertestOp 感兴趣的事件,int selectionKey } AbstractChannel实现了ChannelOutboundInvoker接口,接口中有connect,bind,flush,write等方法,AbstractChannel这些方法实现中都使用了pipeline同名方法,因为pipeline也实现了ChannelOutboundInvoker接口,pipeline调用ChannelHandlerContext同名方法,因为context也实现了ChannelOutboundInvoker,context会从tail开始一直往前调用.

channel内config解说 unsafe为Channel的内部类,unsafe内包含 RecvByteBufAllocator.Handle recvHandle,通过调用channel的config().getRecvByteBufAllocator().newHandle(),获得实际的recvHandle. NioServerSocketChannel的config在构造函数中初始化为NioServerSocketChannelConfig, RecvByteBufAllocator为ServerChannelRecvByteBufAllocator(),设置了recvByteBufAllocator的defaultMaxMessagesPerRead默认值为16. unsafe为NioMessageUnsafe NioSocketCHannel的config在构造函数中初始化为NioSocketChannelConfig. RecvByteBufAllocator为AdaptiveRecvByteBufAllocator. unsafe为NioByteUnsafe

启动流程

ServerBootstrap.bind

AbstractBootstrap.doBind

AbstractBootstrap.initAndRegister

channelFactory.newChannel channelFactory为ServerBootstrap设置channel(NioServerSocketChannel.class)时候NioServerSocketChannel的构造函数包装类,这里是调用此Channel的无参构造函数, 构造函数做的事情: 使用selectProvider.openServerSocketChannel创建出原生的serverSocketChannel,父类AbstractNioChannel构造函数做的事情:把原生的serverSocketChannel绑定到ch中,设置readInterestOp=OP.ACCEPT, 设置原生channel configureBlocking(false), 父类的父类AbstractChannel做的事情: parent=null,pipeline也初始化好了,设置channelIdinit(channel) 初始化上一步NioServerSocketChannel,往它的pipeline中添加 ChannelInitializer Handler,里面initChannel函数中(添加用户自定义的handler以及使用ch.eventLoop().execute去添加handler:ServerBootstrapAcceptor)group.register(channel) 使用channel内的unsafe执行register, 实际逻辑: channel绑定eventloop,即group.register时候group为multiThreadEventLoopGroup,会指定一条SingleThreadEventLoop绑定上去.同时调用eventLoop.execute(register0)

eventLoop.execute(register0)

AbstractChannel#register0

AbstractNioChannel#doRegister selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, attachment:this) 原生的channel注册到eventLoop的selector中,同时将自身作为attachment,此时感兴趣的事件还是0,即未注册感兴趣事件,因为初始化还未完成pipeline.invokeHandlerAddedIfNeeded() 这里去调用1.1.1.2了,1.1.1.2添加了一个ChannelInitializer,这里会去执行它的initChannel,往pipeline添加用户自定义的handler以及serverBootstrapAcceptorsafeSetSuccess(promise) 设置channelPromise success,这个promise最后要传到外面的pipeline.fireChannelRegistered() 这里已经添加的handler并没有做什么事情if(isActivce()) pipeline.fireChannelActive() NioServerSocketChannel需要判断socket.open,端口是否绑定,因为目前端口还未绑定,所以这里不执行, NioSocketChannel判断channel.open && channel.isConnected所以这里执行 AbstractBootstrap#doBind0

channel.bind(localAddress, promise) 接着调用pipeline.bind,再是tail.bind,从后往前绑定

unsafe.bind 最终是headContext.bind调用了unsafe.bind

doBind(localAddress) headContext.bind 最终调用nioServerSocketChannel.doBind ,实际就是做了一个绑定端口的操作pipeline.fireChannelActive() 最终调用HeadContext.read

unsafe.beginRead 给channel的selectionKey绑定OP_ACCEPT

新连接建立流程

NioEventLoop#run 按照ioRatio分配处理io事件与eventLoop其他任务的时间

selector.select(timeoutMillis)NioEventLoop#processSelectedKeys

unsafe.read 判断selectionKey为OP_ACCEPT, NioServerSockerChannel内的是NioMessageUnsafe

AbstractNioMessageChannel#read

allocHandle.reset(config) 重置RecvByteBufAllocator.Handle,设定maxMessagePerRead=16,totalMessages=0NioServerSockerChannel#doReadMessage 执行SocketUtils.accept(javaChannel())拿到socketChannel,包装成NioSocketChannel,其中NioSocketChannel的interestOp=OP_READallocHandle.continueReading() totalMessage每次新建立一个连接+1,达到maxMessagePerRead就退出循环不再获取新连接,因为eventLoop还需要去处理定时任务pipeline.fireChannelRead

ServerBootstrapAcceptor#channelRead

childGroup.register(child) 这里child就是NioSocketChannel ,register逻辑同启动流程1.1.1.3,不同的是这里还会去执行pipeline.fireChannelActive,同理会给channel的selectionKey绑定OP_READ

AdaptiveRecvByteBufAllocator扩缩容

上面说过NioSocketChannel的是AdaptiveRecvByteBufAllocator AdaptiveRecvByteBufAllocator维护了SIZE_TABLE[],值为容量大小,16~512字节之间按照16递增,从512开始,往后一直乘2,类似扩缩容这种操作SIZE_TABLE[max(0, index - INDEX_DECREMENT)],都是去这张表去查索引对应的容量

unsafe.read

allocHandle.reset(config) 重置maxMessagePerRead=16,totalMessages=totalBytesRead=0loop

allocHandle.allocate(allocator) 分配byteBuf大小,初始为2048,通过guess()函数获得,guess函数返回nextReceiveBufferSize变量NioSocketChannel#doReadBytes

allocHandle.attemptedBytesRead(byteBuf.writableBytes()) 设置attemptBytesRead为byteBuf的大小byteBuf.writeBytes 实际往byteBuf写入数据 allocHandle.lastBytesRead(int bytes)

record(bytes) if (bytes == attemptedBytesRead())满足才会进行record,这个条件意思是读取的数据等于byteBuf分配的长度,即byteBuf读满了,那么就进行扩容,INDEX_INCREMENT=4,INDEX_DECREMENT=1,扩容是下次分配就生效的,缩容得第二次进入record才会生效.循环内实际只能进行扩容,因为有个前提条件要读满了byteBufDefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#lastBytesRead(int) 记录lastBytesRead,totalBytesRead allocHandle.continueReading() 判定是否满足条件不满足退出循环,判断是否totalMessages

record(totalBytesRead()) 这里进入record可以进行缩容,以及之前扩容扩大了的缩容

内存管理

内存规格

先看下jemalloc4的内存规格,这张图有误,目前normal规格最大的是4M

再看下jemalloc3的内存规格 可以看到,相比jemalloc3,jemalloc4的内存规格更加紧密,这样可以减少内存的浪费,比如有个513B的内存需要分配,在jemalloc3需要分配1Kb的内存,而在jemalloc4只需要分配640B的内存.

netty的内存规格 small 对应的最大index=38, 28KB normal 对应的最大index=67 , 4MB huge >4MB

如下是jemalloc4的内存规格表,这个是自己总结的,网上有详细的图可以看到index以及pageIndex. 内存范围 递增的内存大小(单位不写的为B) 16-128 16 128-256 32 256-512 64 512-1024 128 1024-2048 256 2048-4096 512 4096-8192 1024 8k-16k 2048 16k-32k 4096 32k-64k 8k 64k-128k 16k 128k-256k 32k 256k-512k 64k 512k-1024k 128k 1024k-2048k 256k 2048k-4096k 512k

整体分配流程图

核心类解析

PoolChunk解析

PoolChunk表示通过nio原生api分配的一块直接内存区域的抽象,通过nio api分配出来一块4MB的byteBuf. 然后将bytebuf放到poolChunk中. poolChunk就是分配管理这块内存的.

handle介绍

poolChunk中抽象出来页的概念,一页的大小是8KB.一个poolChunk(4MB)含有的页数量是512. 页是poolChunk中最小的内存分配单位,当然对于small规格的,还有poolSubPage这个随后讲解. 我们知道poolChunk管理的这块byteBuf的起始地址,如果想要在poolChunk中分配获得一块内存区域,假如能知道此内存区域是从第几页开始,以及总共分配多少页,就能计算出来这块内存区域的起始地址以及结束地址. 上一段已经说到poolChunk中内存相对地址是如何表示的了,即通过pageOffset(当前页是第几页),pageSize(总共分配了几页)就能进行normal规格的内存分配了,因为normal规格的一定是page(8k)的整数倍.这两个变量在poolChunk中都是用一个long类型变量handle表示,如下图所示.

handle中还有三个参数没有介绍,下面来一一介绍 isUsed: handle不仅表示已经分配的内存区域,还包含未分配的内存区域,所以需要isUsed来表示此区域是否被使用 isSubpage: normal规格的通过pageOffset+pageSize就能定位到内存地址了,但是small规格的不可以,此字段表示handle是否是用于small规格的分配 bimapIdx: 这是给small规格用的,由pageOffset+pageSize可以定位到一块分配给poolSubPage的内存区域,一个poolSubPage内只能分配相同大小的内存,如32B,不能像poolChunk一样即可以分配32K,也可以分配64K.现在知道了poolSubPage内分配的内存大小都相同,如果poolSubPage分配到了8192B,每个element大小为32B,那么总共可以分配256个,我们知道这256个可以用一个bit数组来表示,poolSubPage中使用了long数组来表示这个bit数组, bitmapIdx其实就是这个bit数组的下标.

runsAvail & runsAvailMap

runsAvail类型是IntPriorityQueue[],长度为poolChunk内存大小4M的pageIndex,初始化时候往最后一个index中插入了initHandle=2048,我们知道handle后32位是bitmapIdx,runsAvail实际存的是不带bitmapIdx的handle,恢复成实际handle就是2048<<32,去除掉isUsed,isSubpage的两位,pageSize大小为2^9=512,每页为8k,实际就是4M大小,即poolChunk大小为4M. runsAvail就是个保存handle的容器,每次分配的时候都从满足reqCapactity的最小pageIndex对应的runsAvail开始找.

private long allocateRun(int runSize) {

int pages = runSize >> pageShifts;

int pageIdx = arena.pages2pageIdx(pages);

runsAvailLock.lock();

try {

//find first queue which has at least one big enough run

//从pageIdx对应的runsAvail往后找,找到第一个满足的runAvail,这样保证内存分配都是从低位开始分配

int queueIdx = runFirstBestFit(pageIdx);

if (queueIdx == -1) {

return -1;

}

//get run with min offset in this queue

IntPriorityQueue queue = runsAvail[queueIdx];

long handle = queue.poll();

assert handle != IntPriorityQueue.NO_VALUE;

//左移bitmap的32位,恢复成实际的handle

handle <<= BITMAP_IDX_BIT_LENGTH;

assert !isUsed(handle) : "invalid handle: " + handle;

//先把原先的handle在runsAvailMap中删除

removeAvailRun0(handle);

handle = splitLargeRun(handle, pages);

int pinnedSize = runSize(pageShifts, handle);

freeBytes -= pinnedSize;

return handle;

} finally {

runsAvailLock.unlock();

}

}

private long splitLargeRun(long handle, int needPages) {

assert needPages > 0;

//获取到handle的page数量

int totalPages = runPages(handle);

assert needPages <= totalPages;

//得到剩余page数

int remPages = totalPages - needPages;

if (remPages > 0) {

//得到原先handle的pageOffset

int runOffset = runOffset(handle);

// keep track of trailing unused pages for later use

// 原pageOffset加上此次分配的page数得到新的pageOffset

int availOffset = runOffset + needPages;

long availRun = toRunHandle(availOffset, remPages, 0);

//按照remPages对应的pageIdx插入到runsAvail中,同时更新runsAvailMap

insertAvailRun(availOffset, remPages, availRun);

// not avail

return toRunHandle(runOffset, needPages, 1);

}

//mark it as used

handle |= 1L << IS_USED_SHIFT;

return handle;

}

runsAvail主要是在分配的时候使用,runsAvailMap是在回收的时候将几块小的连续内存区域合并成一块大的

//collapseRuns是在free函数中执行,handle是要释放的handle

private long collapseRuns(long handle) {

//即向前向后寻找与当前handle相连的内存区域合并返回

return collapseNext(collapsePast(handle));

}

private long collapsePast(long handle) {

for (;;) {

int runOffset = runOffset(handle);

int runPages = runPages(handle);

//找到当前handle的前一个offset

long pastRun = getAvailRunByOffset(runOffset - 1);

if (pastRun == -1) {

return handle;

}

int pastOffset = runOffset(pastRun);

int pastPages = runPages(pastRun);

//is continuous

//判断pastRun是否与当前handle连续,是的话不断循环往前找

if (pastRun != handle && pastOffset + pastPages == runOffset) {

//remove past run

removeAvailRun(pastRun);

handle = toRunHandle(pastOffset, pastPages + runPages, 0);

} else {

return handle;

}

}

}

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: