Netty
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.48.Final</version>
</dependency>
NioEventLoop-->thread客服端接收客户端连接
Channel连接的封装-->Socket PipeLine逻辑处理链-->Logic Chain
ChanneHandler-->Logic逻辑处理链的处理 ByteBut数据流读写-->IO Bytes
BIO:一个线程负责连接,一请求一应答 阻塞同步
伪异步IO:线程池负责连接,M请求N应答 阻塞同步
NIO:缓冲区Buffer,通道Channel,多路复用器Selector 非阻塞同步
AIO:连接注册读写事件和回调函数,读写方法异步,主动通知程序 非阻塞异步
服务端启动
1. 创建服务端Channel
bind()用户代码入口->initAndRegister()初始化并注册->newChannel()创建ServerSocketChannel ->newSocket()通过JDK创建底层jdk channel->NioServerSocketChannelConfig()配制TCP参数类->AbstractNioChannel()->configureBlocking(false)阻塞模式:AbstractChannel()创建id,undafe,pipeline
1. 初始化服务端Channel
init()初始化入口->set ChannelOptions,ChannelAttrs->set ChildOptions,ChildAttrs->config handler配制服务端pipeline逻辑处理链->add ServerBootstrapAcceptor添加连接器
1. 注册事件轮寻器selector
AbstractChannel.register(channel)入口->this.eventLoop=eventLoop绑定线程->resgiter0()实际注册–doRegister()调用JDK底层注册:invokeHandlerAddIfNeeded()用户回调->fireChannelRegistered)传播事件
1. 端口绑定
AbstractUnsafe.bind()入口->doBind()->javaChannel().bind()JDK底层绑定->pipeline.fireChannelActive()事件传播->HeadContext.readIfIsAutoRead()
NioEventLoop
每次执行创建一个线程实体FastThreadLocalThread,线程命名规则nioEventLoop-1-xx
创建->new NioEventLoopGroup()线程组,默认2*cpu->new ThreadPerTaskExecutor)线程创建器->for(){newChild()}构造NioEventLoop->chooserFactory.newChooser()线程选择器
newChild()
保存x线程执行器ThreadPerTaskExecutor
创建一个MpscQueue
创建一个selector
启动->bind()->execute(task)入口->startThread()->doStartThread()创建线程->ThreadPerTaskExecutor.execute()->thread=Thread.currentThread()->NioEventLoop.run()启动->for(;;)->select()检查是否有IO事件->processSelectedKeys()处理io事件->runAllTasks()处理异步任务队列
新连接接入
检测新连接->创建NioSocketChannel->分配线程及注册selector->向selector注册读事件
processSwlectdKey(key,channel)入口->NioMessageUnsafe.read()->doReadMessages()循环while->javaChannel().accept()
new NioSocketChannel(parent,ch)入口->AbstractNioByteChannel(p,ch,op_read)->configureBlocking(false)&save op:-:create id,unsafe,pipeline->new NioSocketChannelConfig()->setTcpNoDelay(true)禁止Nagle算法
channel分类
NioServerSocketChannel
NioSocketChannel
Unsafe
head->ServerBootstrapAcceptor->Tail
pipeline
1. pipeline初始化
在创建Channel的时候创建,节点数据结构ChannelHandlerContext,两大哨兵head和tail
1. 添加删除ChannelHandler
注意顺序
1. 事件和异常的传播
inBound head->a1->b1->…->tail
在处理ByteBuf时不往下传播注意处理要释放资源,SimpleChannelInboundHandler已实现自动释放,都没有处理的在tail中会释放
outBound head<-a2<-b2<-…<-tail
异常触发链 b1->…->a2->b2->…->tail 最好在链最后添加异常处理器,没有处理过的在最后能一同处理
ByteBuf
内存与内存管理器的抽象
不同规格大小和不同类别的内存的分配策略
内存的回收过程
结构:discardable bytes |readable bytes|writable bytes
批针:0<=readerIndex<=writerIndex<=capacity<=maxCapacity
方法:read write set(在当前位置操作,不会移动指针) mark(保存指针),reset(复位指针)
分类:ByteBuf<-AbstratByteBuf<-
PooledHeapByteBuf:<-PooledUnsafeHeapByteBuf
PooledUnsafeDirectByteBUf:
PooledDirectByteBuf:
UnPooledHeapByteBuf:<-UnPooledUnsafeHeapByteBuf
UnPooledUnsafeDirectByteBUf:
UnPooledDirectByteBuf
Pooled从预先分配好的内存操作 UnPooled直接API分配内存
Unsafe可以通过内存直接拿到 非Unsafe没有依赖底层内存JDK
Heap堆上分配内存 Direct虚拟机外的内存中分配了一块缓冲区,不参与GC,手动释放
Direct Buffer创建和销毁的代价很高,所以要用在尽可能重用的地方。 比如周期长传输文件大采用direct buffer,不然一般情况下就直接用heap buffer 就好。
Thread -> PoolThreadCache(tinyCaheSize smallCacheSize normalCacheSize) -> Area
(SubPage)0–>tiny(N*16B共32种)<--512B-->small<–8K(page 512B,1K,2K,4K)–normal—16M(Chunk 8K,16K,32K)—>huge
释放:连续的内存区段加到缓存,标记连续的内存区段为未使用,ByteBuf加到对象池
解码 ByteToMessageDecoder
累加字节流->调用子类的decode方法解析->解析的ByteBuf向下传播
基于固定长度 FixedLengthFrameDecoder:n长度
基本行解码 LineBasedFrameDecoder:\n,\r\n换行结束符,丢弃和非丢弃模式
基于分隔符 DelimiterBasedFrameDecoder 先行处理,找到最小分隔符,解码
基于长度域 LengthFieldBasedFrameDecoder lengthFieldOffset:2偏移lengthFieldLength:2长度
编码 writeAndFlush
从tail节点往前传播-逐个调用channelHandler的write方法->逐个调用channelHandler的flush方法
模拟百万连接
突破局部文件限制和全局文件限制
Server 8000~8100 Client 1025~65535
ulimit -n
sudo vi /etc/security/limits.conf 增加
* hard nofile 1000000
* soft nofile 1000000
cat /proc/sys/fs/file-max
临时使用,重启回到10000
sudo -s
echo 100000 > /proc/sys/fs/file-max
exit
持久使用
/etc/sysctl.conf 增加
fs.file-max=1000000
sudo sysctl -p /etc/sysctl.conf 后重启
业务优化
1. 在单个业务中使用线程池
1. 给handler指定使用JDK的线程池,做不到内存共享