一、I/O模型的演进:从BIO到AIO
理解Java网络编程的核心脉络,是理解Netty为什么这么设计的起点。从BIO到NIO,再到Epoll(AIO),每一步演进都是为了解决前一代模型的根本缺陷。
1.1 BIO:每个连接一个线程的死亡陷阱
Blocking I/O(BIO)是Java最早的网络编程模型。核心问题:accept()和read()都是阻塞的——一个线程在等待数据时无法服务其他连接。
/**
* BIO的致命缺陷:
*
* 线程模型:Thread-Per-Connection
* - ServerSocket.accept() 阻塞:等待客户端连接
* - Socket.read() 阻塞:等待数据到达
*
* 问题1:线程资源昂贵
* - Java线程默认栈大小约1MB(-Xss)
* - 1万个并发连接 = 1万个线程 = ~10GB栈内存
* - 线程切换成本高(Context Switch,Linux约5-10微秒)
* - 线程调度本身成为瓶颈
*
* 问题2:阻塞等待是浪费
* - 99%的时间线程都在等待数据(连接闲置)
* - CPU空转,无法服务其他连接
* - 线程大部分时间在sleep,不是work
*/
// BIO Server实现(问题代码,仅用于演示缺陷)
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8080);
System.out.println("BIO Server启动,监听8080端口...");
// 问题:每个连接一个线程!
while (true) {
Socket client = server.accept(); // 阻塞①:等待连接
System.out.println("新连接: " + client.getRemoteSocketAddress());
// 每个连接都new一个线程(内存和调度灾难)
new Thread(() -> {
try {
// 阻塞②:等待数据
// 线程在此处沉睡,浪费所有资源
BufferedReader reader = new BufferedReader(
new InputStreamReader(client.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
// 业务处理...
System.out.println("收到: " + line);
client.getOutputStream().write(("响应: " + line + "\n").getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try { client.close(); } catch (IOException ex) {}
}
}).start();
}
}
}
// 实际场景中,BIO线程模型的问题:
// 连接数1000:勉强可用
// 连接数10000:线程数10000,OOM
// 连接数100000:根本不可能
// 瓶颈不在业务逻辑,在线程模型
1.2 NIO:Selector打开事件驱动的大门
New I/O(NIO,Java 1.4引入)是革命性的改变。核心思想:用单线程(或少量线程)管理大量连接,通过Selector检测哪些连接有I/O事件需要处理。
/**
* NIO三大核心组件:
*
* 1. Channel(通道):类似流,但可以非阻塞
* - FileChannel:文件I/O
* - SocketChannel:TCP客户端/主动连接
* - ServerSocketChannel:TCP服务端(接收连接)
* - DatagramChannel:UDP
*
* 2. Buffer(缓冲区):读写数据的容器
* - 堆内缓冲区:byte[],GC压力大
* - 直接缓冲区:堆外内存,不受GC管理,需要手动释放
*
* 3. Selector(选择器):检测多个Channel的I/O事件
* - 一个Selector管理多个Channel
* - select()阻塞直到有Channel就绪(OP_ACCEPT/OP_CONNECT/OP_READ/OP_WRITE)
* - selectNow()非阻塞,立即返回
*/
// NIO Selector的工作原理(Linux epoll的Java封装)
/**
* Linux内核层面:
* epoll_create() → 创建epoll实例
* epoll_ctl() → 注册FD到epoll实例(EPOLL_CTL_ADD)
* epoll_wait() → 等待事件(水平触发LT 或 边缘触发ET)
*
* Java NIO Selector 就是对 epoll 的封装(JDK实现,非JNI)
*
* 关键性能指标:
* - select():O(n),遍历所有FD
* - poll():O(n),使用链表替代数组
* - epoll():O(1),只返回就绪的FD(红黑树 + 就绪链表)
*
* epoll的优势:
* - FD数量不影响效率(不遍历全部)
* - 使用回调而非轮询(内核就绪通知)
* - 支持边缘触发(减少系统调用次数)
*/
// NIO Server核心代码
public class NioServer {
public static void main(String[] args) throws IOException {
// 1. 打开Selector(非阻塞I/O的核心)
Selector selector = Selector.open();
// 2. 打开ServerSocketChannel,配置为非阻塞
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false); // 关键:非阻塞模式
serverChannel.bind(new InetSocketAddress(8080));
// 3. 将ServerSocketChannel注册到Selector,关心ACCEPT事件
SelectionKey acceptKey = serverChannel.register(
selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO Server启动,监听8080...");
// 4. 事件循环(Reactor模式核心)
while (true) {
// 阻塞直到有Channel就绪(可设置超时)
// 这里就是epoll_wait()的Java层调用
int readyCount = selector.select();
// 返回值 = 就绪事件数量(0表示超时)
if (readyCount == 0) continue;
// 遍历所有就绪的SelectionKey
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 重要:从selectedKeys中移除,否则会重复处理
// 判断是什么事件就绪
if (key.isAcceptable()) {
// OP_ACCEPT就绪:有新连接
handleAccept(key);
} else if (key.isReadable()) {
// OP_READ就绪:有数据可读
handleRead(key);
} else if (key.isWritable()) {
// OP_WRITE就绪:可以写数据(通常不需要注册)
handleWrite(key);
}
}
}
}
private static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 接收连接,但不阻塞
SocketChannel client = server.accept();
client.configureBlocking(false);
// 注册读事件(数据到达时通知)
client.register(key.selector(), SelectionKey.OP_READ);
System.out.println("新连接: " + client.getRemoteAddress());
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = client.read(buffer); // 非阻塞!无数据时立即返回0
if (read > 0) {
buffer.flip(); // 切换模式:写→读
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("收到: " + new String(data));
// 注册写事件,准备响应
client.register(key.selector(), SelectionKey.OP_WRITE);
} else if (read == -1) {
// 客户端关闭
client.close();
}
}
private static void handleWrite(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
String response = "HTTP/1.1 200 OK\r\n\r\nHello NIO!";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
client.write(buffer); // 非阻塞写
client.close();
}
}
1.3 多路复用:AIO的最终形态
Java 7引入了AIO(Asynchronous I/O,异步I/O),通过Future+CompletionHandler实现真正的异步。但实际上,AIO在Linux上底层也是epoll,性能并不比NIO有显著优势,且编程模型复杂,工业界使用不多。
二、NIO核心API:Channel、Buffer与Selector
2.1 ByteBuffer的正确使用方式
ByteBuffer是NIO中最常用的类,但大部分人对它的理解停留在表面。最核心的概念是position、limit、capacity三指针模型。
/**
* ByteBuffer内部结构(重要!):
*
* Java中一个Buffer对象有三个指针(索引):
* - position(位置):下一个要读/写的位置。初始为0。
* - limit(限制):第一个不能读/写的元素位置。初始等于capacity。
* - capacity(容量):缓冲区的总大小,创建时固定。
*
* 两种模式(通过flip()切换):
*
* 写模式(写入channel → buffer):
* position: 0 → N(写入的数据量)
* limit: capacity(仍然指向末尾)
*
* flip()(切换到读模式):
* limit = position(设置为写入的数据量边界)
* position = 0(从头开始读)
*
* 读模式(从buffer → 读取/输出):
* position: 0 → N(已读)
* limit: N(还没读的边界)
*
* compact()(半自动:压缩已读空间,保留未读数据,继续写):
* buffer.compact() = flip() + mark() reset() 组合
* 未读数据移到开头,position指向未读数据末尾,继续写
*/
// ByteBuffer使用常见错误与正确做法
public class ByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
// ❌ 错误:忘记flip,直接从写模式读取
buffer.put("hello".getBytes()); // position=5, limit=10
// byte b = buffer.get(); // 读不到正确数据!
// ✅ 正确做法1:flip(从头读)
buffer.flip(); // position=0, limit=5
System.out.println("可读字节数: " + buffer.remaining()); // 5
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get()); // hello
}
// ✅ 正确做法2:compact(边读边写,不丢数据)
buffer.clear(); // position=0, limit=capacity, 覆盖
buffer.put("world".getBytes()); // position=5
buffer.flip(); // position=0, limit=5
// 如果只读了3字节,compact后未读的"lo"会保留
buffer.get(); buffer.get(); buffer.get(); // 读了"hel"
buffer.compact(); // 未读"lo"移到开头,position=2,继续写
// ✅ 直接缓冲区和堆缓冲区
// 堆缓冲区:ByteBuffer.allocate() — heap memory, GC回收
ByteBuffer heapBuffer = ByteBuffer.allocate(1024);
// 直接缓冲区:ByteBuffer.allocateDirect() — 堆外内存,OS直接访问
// 优点:避免了JVM堆到内核的复制(零拷贝!)
// 缺点:创建/销毁成本高,需要手动close(不是GC自动管理)
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024 * 1024);
// 实际使用:文件传输、Socket高性能场景
// 检查缓冲区类型
if (directBuffer.isDirect()) {
System.out.println("这是直接缓冲区(堆外内存)");
}
// 手动释放直接缓冲区(JDK 17+)
// ((DirectBuffer)directBuffer).cleaner().clean();
}
}
/**
* Buffer使用规范(Netty的ByteBuf做了大量改进):
*
* 1. 每write.flip().read().clear()一组操作
* 2. 直接缓冲区用完必须释放
* 3. 避免在循环内allocate(开销大),使用池化
* 4. 多线程不共享Buffer(Buffer非线程安全)
*/
2.2 直接内存与零拷贝
直接缓冲区(堆外内存)是理解Netty高性能的关键。Linux的I/O模型中,数据从磁盘到应用程序需要经历多次拷贝:
/**
* 传统I/O的数据拷贝路径(4次拷贝):
*
* 磁盘 → 内核缓冲区(PageCache) → 用户缓冲区(JVM堆) → Socket缓冲区 → 网卡
* ① DMA拷贝 ② CPU拷贝 ③ CPU拷贝
*
* 关键问题:
* - ② 和 ③ 是CPU拷贝,在JVM堆和内核之间来回搬运数据
* - JVM GC会移动堆内对象,导致内核缓冲区地址失效
* - 解决方案:内核→用户用DirectBuffer(零拷贝)
*
* 使用直接缓冲区后的路径(3次拷贝):
*
* 磁盘 → 内核缓冲区(PageCache) → 直接缓冲区(堆外) → 网卡
* ① DMA拷贝 ② CPU拷贝(OS内存区域,非JVM堆)
*
* 优势:
* - PageCache缓存:热点文件数据驻留内核内存,减少磁盘I/O
* - 直接缓冲区:JVM和内核共享同一块内存区域,无需额外复制
* - Linux sendfile():内核→网卡零拷贝(彻底绕过用户空间)
*
* Netty的零拷贝:CompositeByteBuf组合多个缓冲区逻辑为一个
* 避免了多个小Buffer合并时的大量CPU拷贝
*/
// 直接内存分配的成本
public class DirectBufferCost {
public static void main(String[] args) {
long start, end;
int iterations = 1000;
// 堆缓冲区分配(快,但有GC压力)
start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
ByteBuffer heap = ByteBuffer.allocate(4096);
}
end = System.nanoTime();
System.out.println("堆缓冲区分配: " + (end - start) / 1_000_000 + " ms");
// 直接缓冲区分配(慢,但零拷贝)
start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
ByteBuffer direct = ByteBuffer.allocateDirect(4096);
// 手动释放(非GC自动管理!)
((DirectBuffer) direct).cleaner().clean();
}
end = System.nanoTime();
System.out.println("直接缓冲区分配: " + (end - start) / 1_000_000 + " ms");
// 结论:直接缓冲区分配约慢5-10倍
// 解决方案:池化(Netty的PooledByteBufAllocator就是池化+直接内存)
}
}
2.3 epoll的两种触发模式
/**
* epoll两种触发模式:
*
* 水平触发(LT, Level Triggered,默认):
* - 只要FD处于就绪状态,每次epoll_wait都会返回
* - 即使上次返回后没有处理,下次epoll_wait继续返回
* - 类似于GPIO的"电平有效"
* - 优点:编程简单,不会漏事件
* - 缺点:频繁epoll_wait调用
*
* 边缘触发(ET, Edge Triggered):
* - 只在FD状态变化(从不可读→可读)时返回一次
* - 第一次返回后必须把buffer读完,否则下次epoll_wait不返回
* - 类似于GPIO的"上升沿有效"
* - 优点:减少epoll_wait调用次数,高性能
* - 缺点:编程复杂,必须非阻塞读取到EAGAIN
*
* Selector在Linux上默认使用LT模式(Selector.open()等价于epoll_create + LT)
* 设置为ET需要额外配置(不同JDK实现方式不同)
*/
// Selector wakeup机制(关键!)
/**
* 问题:selector.select() 阻塞在 epoll_wait()
* 另一个线程想关闭连接或插入新任务怎么办?
*
* 解决:Selector.wakeup()
* - wakeup()向管道写一个字节,selector立即从select()返回
* - 之后调用select()的线程继续正常工作
*
* 典型场景:Netty中的BossGroup线程调用wakeup()让WorkerGroup立即响应
*/
/**
* wakeup()的工作原理(Linux实现):
*
* JDK创建Selector时,同时创建一个eventfd(2.6.22+)或pipe(老版本)
* - wakeup()向eventfd写入一个字节(自增计数器)
* - epoll_wait()检测到eventfd就绪,立即返回
*
* Netty用这个机制实现:
* Boss线程注册连接 → 唤醒Boss线程处理accept
* TaskQueue插入任务 → 唤醒工作线程处理任务
*/
// Netty源码中的wakeup模式(简化版)
// bossLoop.execute(() -> {
// // 注册新的连接
// serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// // 通知selector有新事件
// selector.wakeup();
// });
// 或在关闭时
// selector.wakeup();
// selector.close();
三、Netty线程模型:Reactor模式的工业实现
Netty是Reactor模式最成熟的Java实现。理解Netty的线程模型,是写出高性能Netty应用的必备前提。
3.1 三种Reactor线程模型
/**
* Reactor模型的核心组件:
*
* 1. Reactor(反应器):监听和分发I/O事件
* 2. Handler(处理器):执行业务逻辑
*
* 三种变体:
*
* 单线程Reactor:
* - 只有一个线程处理所有accept/read/write
* - 缺陷:业务处理阻塞会导致整个服务瘫痪
* - 适用:小规模学习,生产环境不推荐
*
* 多线程Reactor(常用):
* - BossGroup:1个或N个线程,只处理accept
* - WorkerGroup:1个或N个线程,处理read/write/业务
* - 分离了连接管理和I/O处理
* - 适用:大多数生产场景
*
* 主从多线程Reactor(Netty推荐):
* - MainReactorGroup:处理accept(通常1线程)
* - SubReactorGroup:处理read/write(N线程)
* - 业务处理器:线程池(可自定义)
* - 优点:各司其职,扩展性最好
* - 适用:超大规模并发(百万连接)
*/
// Netty线程模型图示
/**
* ┌──────────────────────────────────┐
* │ BossGroup (NioEventLoop) │
* │ 监听ServerSocketChannel ACCEPT │
* └──────┬───────────────────┬────────┘
* │ accept │
* ▼ ▼
* ┌─────────────────────┐ (多ServerSocketChannel)
* │ WorkerGroup │
* │ (NioEventLoopGroup) │
* │ │
* │ [EL1] [EL2]...[ELn]│ ← 每个EventLoop持有一个Selector
* │ │ │ 处理N个连接的就绪事件
* │ ├─ Sock1 read/write │
* │ ├─ Sock2 read/write │
* │ └─ Sock3 read/write │
* └──────────────┬──────────┘
* │
* ▼
* ┌──────────────────────────┐
* │ Handler处理链(ChannelPipeline)
* │ ① Codec:解码/编码
* │ ② 业务Handler:业务逻辑
* │ ③ 自定义Handler
* └──────────────────────────┘
* │
* ▼ (如需进一步解耦)
* ┌──────────────────────────┐
* │ 业务线程池(业务CPU密集/阻塞时用)│
* │ 通过 ctx.channel().eventLoop() │
* │ .execute() 提交到业务线程池 │
* └──────────────────────────┘
*/
3.2 Netty Echo Server实现
// ============ Netty Echo Server ============
/**
* Echo Server:收到什么就返回什么
* 这是Netty最经典的入门示例
*/
public class NettyEchoServer {
public static void main(String[] args) throws InterruptedException {
// BossGroup:处理accept事件,通常1-2个线程足够
// 连接管理不需要太多线程(accept是轻量操作)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup:处理read/write/业务处理,通常 = CPU核数*2
// I/O事件处理需要较多线程
NioEventLoopGroup workerGroup = new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors() * 2);
try {
// ServerBootstrap:Netty服务启动辅助类(Builder模式)
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // NIO Channel类型
// .option() 配置ServerSocketChannel参数(boss线程)
.option(ChannelOption.SO_BACKLOG, 1024) // 连接队列长度
.option(ChannelOption.SO_KEEPALIVE, true) // TCP保活
// .childOption() 配置SocketChannel参数(worker线程)
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle,小包立即发送
.childOption(ChannelOption.SO_SNDBUF, 64 * 1024) // 发送缓冲区64KB
.childOption(ChannelOption.SO_RCVBUF, 64 * 1024) // 接收缓冲区64KB
// .attr() 设置Channel属性(可自定义)
// .handler() 设置boss线程的Handler(ServerSocketChannel专属)
// .childHandler() 设置worker线程的Handler(SocketChannel专属)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// ChannelPipeline:责任链模式,每个Handler处理特定逻辑
// inbound按顺序执行,outbound按逆序执行
ChannelPipeline pipeline = ch.pipeline();
// Decoder:将ByteBuf解码为String(换行符分割)
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
// Encoder:将String编码为ByteBuf
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
// 业务Handler:Echo逻辑
pipeline.addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待启动完成
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Netty Echo Server启动,监听8080...");
// 优雅关闭:等待服务器 socket 关闭
future.channel().closeFuture().sync();
} finally {
// 释放资源:关闭线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* EchoHandler:ChannelInboundHandlerAdapter的子类
*
* ChannelInboundHandler方法调用时机:
* 1. channelRegistered → channelActive → channelRead (数据到达)
* 2. channelReadComplete → channelInactive → channelUnregistered (连接关闭)
*
* ChannelOutboundHandler方法:
* bind/listen, connect, accept, read, write, flush, close, deregister
*/
@Sharable // 可被多个Channel共享(无状态Handler)
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(EchoServerHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg是解码后的String(因为Pipeline中有StringDecoder)
String request = (String) msg;
log.info("收到: {}", request);
// 写回响应(写操作通过ChannelOutboundHandler逆序传播)
// ctx.write() 只是写入发送缓冲区(不立即发)
// ctx.flush() 才真正发送到网络
ctx.write(request); // write + flush = writeAndFlush
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常:通常关闭连接
log.error("连接异常: {}", ctx.channel().remoteAddress(), cause);
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("连接建立: {}", ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("连接断开: {}", ctx.channel().remoteAddress());
}
}
// ============ 客户端代码 ============
public class NettyEchoClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder(StandardCharsets.UTF_8))
.addLast(new StringEncoder(StandardCharsets.UTF_8))
.addLast(new EchoClientHandler());
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
channel.writeAndFlush(line + "\n"); // 带换行符
}
} finally {
group.shutdownGracefully();
}
}
}
class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("服务端响应: " + msg);
}
}
四、ByteBuf:Netty的零拷贝缓冲区
ByteBuf是Netty自研的缓冲区,相比JDK的ByteBuffer有显著改进:池化、引用计数、动态扩展、双指针独立读写。
4.1 ByteBuf vs ByteBuffer核心对比
/**
* ByteBuf 的核心改进:
*
* 1. 读写指针分离(readerIndex / writerIndex)
* - ByteBuffer只有position一个指针,读写共用
* - ByteBuf有两个独立指针,写操作不改变readerIndex
* - 不需要flip()切换模式
*
* 2. 池化(Pooled vs Unpooled)
* - Unpooled:每次分配新内存,GC压力大
* - Pooled:从内存池中获取已分配的缓冲区,零分配
* - Netty默认使用PooledByteBufAllocator
*
* 3. 动态扩展
* - ByteBuffer创建时固定capacity
* - ByteBuf可自动扩展(writableBytes不够时自动扩容)
* - writeBytes() 超出容量时自动扩容
*
* 4. 引用计数
* - 每个ByteBuf有引用计数(referenceCount)
* - retain() +1,release() -1
* - 计数为0时释放内存(不是GC!)
* - 用于零拷贝:将Buffer"转移"而非"复制"
* - 不调用release() = 内存泄漏(NIO泄漏检测可发现)
*
* 5. 组合缓冲区(CompositeByteBuf)
* - 逻辑上组合多个ByteBuf为一个
* - 避免复制和合并的开销(零拷贝的核心)
*/
// ByteBuf使用示例
public class ByteBufDemo {
public static void main(String[] args) {
// 池化直接缓冲区(Netty推荐配置)
// 池化减少GC压力,直接内存提升I/O性能
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(256);
// 写入数据(writerIndex自动移动)
buf.writeBytes("Hello Netty".getBytes());
System.out.println("可读字节: " + buf.readableBytes()); // 11
// 读取数据(readerIndex自动移动)
byte b = buf.readByte();
System.out.println("读到的字节: " + (char) b); // H
// 不需要flip!直接继续写
buf.writeInt(12345);
// 获取字节数组(零拷贝?不,复制)
byte[] arr = new byte[buf.readableBytes()];
buf.readBytes(arr);
// 如果要避免复制,使用NioUnsafe直接获取底层数组
// ByteBuf内部是.nioBuffer() 返回共享视图
// release! 引用计数-1,为0时释放内存
// 这是Netty内存管理的核心规则
buf.release();
// 组合缓冲区(零拷贝合并)
CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
ByteBuf header = ByteBufAllocator.DEFAULT.buffer().writeBytes("Header".getBytes());
ByteBuf body = ByteBufAllocator.DEFAULT.buffer().writeBytes("Body".getBytes());
// 不复制,直接逻辑组合
composite.addComponents(header, body);
// 发送时:header + body 连续内存,但实际是两个ByteBuf的引用
// 零拷贝场景:文件传输,不需要将文件内容复制到用户空间
composite.release();
}
}
4.2 ByteBuf泄漏检测
/**
* ByteBuf泄漏是Netty最常见的内存问题
*
* 原因:ByteBuf由引用计数管理,不是GC自动回收
* 忘记调用release() → 内存泄漏
*
* 检测级别(通过-Dio.netty.leakDetection.level设置):
*
* disabled:不检测(生产环境默认,性能最好)
* simple:抽样1%的ByteBuf,JVM退出时报告(测试环境)
* advanced:跟踪泄漏ByteBuf的创建位置(开发环境)
* paranoid:每次创建都检测,每次泄漏立即报告(调试)
*
* 泄漏检测原理:WeakReference + ReferenceQueue
* 创建ByteBuf时注册到跟踪队列,GC回收时检测是否调用了release()
*/
// 泄漏检测的典型输出
/**
* LEAK: ByteBuf.release() was not called before it's garbage-collected.
* Recent access records:
* Print 10 records above.
*
* 创建位置堆栈(泄漏发生的位置):
* - MyHandler.channelRead (MyHandler.java:42)
* - ByteBufTest.main (ByteBufTest.java:10)
* ...
*
* 修复方法:
* 1. try-finally:finally块中release
* 2. ReferenceCountUtil.releaseLater(msg):自动release
* 3. SimpleChannelInboundHandler:自动release收到的消息
*/
// 正确的Handler写法
public class SafeHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 方式1:try-finally(标准做法)
ByteBuf buf = null;
try {
buf = (ByteBuf) msg;
// 业务处理...
ctx.fireChannelRead(buf); // 转发给下一个Handler(注意:不要release)
} finally {
if (buf != null && buf.refCnt() > 0) {
buf.release(); // 引用计数-1
}
}
// 方式2:SimpleChannelInboundHandler(推荐)
// 自动对msg进行release,但转发时需要retain()
}
}
// 方式2的实现
public class BetterHandler extends SimpleChannelInboundHandler<ByteBuf> {
// SimpleChannelInboundHandler会自动release msg
// 但如果需要将msg传给下一个Handler,必须retain()
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// msg会被自动release,retain后传给下一个Handler
ctx.fireChannelRead(msg.retain());
}
}
五、Netty WebSocket服务器与心跳
// ============ Netty WebSocket服务器 ============
/**
* WebSocket协议:
* HTTP握手建立连接 → 升级为全双工WebSocket
* 适用于:聊天、实时推送、游戏、协作编辑
*/
public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors() * 2);
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer());
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("WebSocket Server启动: ws://localhost:8080/ws");
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HTTP编解码(WebSocket握手需要)
pipeline.addLast(new HttpServerCodec());
// 聚合HTTP请求/响应(FullHttpRequest/FullHttpResponse)
pipeline.addLast(new HttpObjectAggregator(65536));
// WebSocket握手Handler(升级协议)
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义Handler
pipeline.addLast(new WebSocketFrameHandler());
}
}
class WebSocketFrameHandler extends TextWebSocketFrameHandler {
private static final Logger log = LoggerFactory.getLogger(WebSocketFrameHandler.class);
// 维护所有在线连接(用于广播)
private static final ChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE); // 全局唯一的事件执行器
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
channels.add(ctx.channel()); // 加入广播组
log.info("客户端连接: {}", ctx.channel().id());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
channels.remove(ctx.channel()); // 从广播组移除
log.info("客户端断开: {}", ctx.channel().id());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
String text = frame.text();
log.info("收到消息: {}", text);
// 广播给所有连接
// TextWebSocketFrame:WebSocket文本帧
// BinaryWebSocketFrame:WebSocket二进制帧
// PingWebSocketFrame / PongWebSocketFrame:心跳帧
channels.writeAndFlush(new TextWebSocketFrame("广播: " + text));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("异常", cause);
ctx.close();
}
}
// ============ WebSocket 心跳机制 ============
/**
* 心跳的两个目的:
* 1. 检测连接是否存活(对端崩溃/网络断开)
* 2. 保持TCP连接活跃(防止中间设备断开空闲连接)
*
* WebSocket心跳:Ping-Pong帧
* 主动方发送PingFrame,被动方必须响应PongFrame
*/
// 心跳Handler实现
@Sharable
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
// 空闲检测:读空闲超过阈值,发送Ping
// ReadIdleTimeoutEvent → 发送PingFrame → 等待PongFrame
// 连续N次没收到Pong → 判定为断开,关闭连接
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent idle) {
switch (idle.state()) {
case READER_IDLE:
// 读空闲(服务端没收到客户端消息太久)
// 发送Ping探测客户端是否存活
ctx.writeAndFlush(new PingWebSocketFrame());
System.out.println("发送Ping探测连接: " + ctx.channel().id());
break;
case WRITER_IDLE:
// 写空闲(服务端很久没发消息给客户端)
// 可以发送心跳消息
break;
case ALL_IDLE:
// 读写都空闲
break;
}
}
// 事件需要手动传播,否则后续Handler收不到
ctx.fireUserEventTriggered(evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof PingWebSocketFrame) {
// 收到Ping,响应Pong
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
System.out.println("响应Pong");
} else if (frame instanceof PongWebSocketFrame) {
// 收到Pong,确认对端存活
System.out.println("收到Pong,连接存活");
}
}
}
Netty最佳实践:Pipeline中的Handler尽量设计为@Sharable(无状态),减少对象创建。有状态Handler通过ChannelHandlerContext的attr()存储连接级别状态,避免内存泄漏。
六、Netty高性能调优参数汇总
/**
* Netty性能调优三维度:
*
* 维度1:内存配置
* 维度2:I/O线程数
* 维度3:TCP参数
*
* 生产环境调优原则:
* - 先压测,拿到数据再调(不要凭感觉)
* - 单参数调整,每次只改一个
* - 关注瓶颈:CPU、内存、网络、GC
*/
// ============ 1. ByteBuf池化与内存配置 ============
// VM启动参数
// -Dio.netty.allocator.type=pooled // 使用池化分配器(默认)
// -Dio.netty.allocator.numDirectArenas=4 // 直接内存池数量=CPU核数
// -Dio.netty.allocator.pageSize=8192 // 内存页大小
// -Dio.netty.allocator.maxOrder=11 // 最大阶数(chunk大小=pageSize*2^maxOrder)
// -Dio.netty.noPreferDirect=true // 优先堆缓冲区而非直接缓冲区
// JVM堆外内存配置(Netty直接缓冲区使用的内存)
// -XX:MaxDirectMemorySize=4G // 直接内存最大4G
// 注意:Netty直接内存不受-Xmx控制,需单独配置
// 动态配置(代码中)
System.setProperty("io.netty.allocator.numDirectArenas",
String.valueOf(Runtime.getRuntime().availableProcessors()));
// ============ 2. I/O线程数配置 ============
// Boss线程数:通常1即可(accept是轻量的)
// 特殊场景:SO_REUSEPORT(多个进程绑定同一端口)时,可设>1
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Worker线程数:CPU密集型=核数+1~2,I/O密集型=核数*2
// 计算公式:
// WorkerThreads = CoreNum * (1 + (WaitTime / WorkTime))
// 极度I/O密集(大量DB/远程调用):CoreNum * 2
// 混合负载(编解码+业务):CoreNum
NioEventLoopGroup workerGroup = new NioEventLoopGroup(
Math.min(Runtime.getRuntime().availableProcessors() * 2, 32));
// ============ 3. TCP参数优化 ============
ServerBootstrap bootstrap = new ServerBootstrap()
// TCP连接队列:三次握手在队列中等待accept()处理的数量
// 默认50,高并发时需调大,否则客户端connect()会报ConnectionRefused
.option(ChannelOption.SO_BACKLOG, 4096)
// SO_REUSEADDR: TIME_WAIT状态的端口可立即重用
.option(ChannelOption.SO_REUSEADDR, true)
// TCP_NODELAY:禁用Nagle算法,小包立即发送
// 默认false(启用Nagle),适合低延迟场景
// 关闭:适合实时游戏、交易系统
// 开启:适合批量传输、文件传输
.childOption(ChannelOption.TCP_NODELAY, true)
// SO_KEEPALIVE:TCP保活,检测对端崩溃/网络断开
// 默认false,开启后2小时无数据触发探测
// 建议:长连接应用开启
.childOption(ChannelOption.SO_KEEPALIVE, true)
// SO_LINGER:close()时的行为
// =0:立即关闭,不发送FIN
// =-1(默认):等待缓冲区数据发送完成
// >0:等待超时后强制关闭
.childOption(ChannelOption.SO_LINGER, 0)
// SO_SNDBUF / SO_RCVBUF:发送/接收缓冲区大小
// 默认:系统决定(通常64KB)
// 调整:带宽高、延迟大的场景可增大
.childOption(ChannelOption.SO_SNDBUF, 256 * 1024) // 256KB
.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
// ============ 4. 高并发连接数优化 ============
// 文件描述符限制(ulimit -n)
// 每条TCP连接占用1个FD
// 默认1024不够,需要调到100000+
// /etc/security/limits.conf
// * soft nofile 1048576
// * hard nofile 1048576
// JVM参数:事件循环线程数上限(Netty默认无上限)
// -Dio.netty.eventLoop.maxPendingTasks=1024
// 超过此值的任务会被拒绝
// ============ 5. GC优化 ============
// CMS/G1/Shenandoah/ZGC选择建议
// ZGC(JDK 11+):停顿时间<1ms,适合大内存(>8GB)场景
// G1(JDK 9+):默认,平衡型
// Netty内部使用了大量DirectBuffer和ByteBuf
// 池化+直接内存使得GC压力小很多
// 推荐JVM参数
// java -server \
// -Xms8G -Xmx8G \
// -XX:+UseZGC \
// -XX:MaxDirectMemorySize=4G \
// -XX:+AlwaysPreTouch \
// ...
七、Netty长连接心跳实战
// ============ Netty长连接心跳最佳实践 ============
/**
* 长连接心跳场景:
* - 移动App与服务端保持连接(推送、IM)
* - 游戏客户端保持连接
* - IoT设备上报数据
*
* 心跳策略:
* - 客户端定时发送Ping(通常30-60秒一次)
* - 服务端检测读空闲,超过阈值(2-3倍心跳间隔)关闭连接
* - 双向心跳:客户端Ping→服务端Pong,服务端Ping→客户端Pong
*/
// Pipeline配置(Server端完整版)
class LongConnectionServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 空闲检测Handler(必须放在最前面)
// IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime)
// 触发 IdleStateEvent,传递给后续Handler
p.addLast(new IdleStateHandler(60, 0, 0)); // 60秒读空闲
// 编解码
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufDecoder(UserMessage.getDefaultInstance()));
p.addLast(new ProtobufEncoder());
// 心跳处理(接收Ping,响应Pong)
p.addLast(new HeartbeatHandler());
// 业务处理
p.addLast(new BusinessHandler());
}
}
@Sharable
class HeartbeatHandler extends ChannelDuplexHandler {
// 记录最后收到消息的时间
private volatile long lastReadTime = System.currentTimeMillis();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lastReadTime = System.currentTimeMillis();
super.channelRead(ctx, msg); // 继续传递给业务Handler
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idle = (IdleStateEvent) evt;
if (idle.state() == IdleState.READER_IDLE) {
long idleTime = System.currentTimeMillis() - lastReadTime;
// 超过90秒没收到消息,关闭连接
if (idleTime > 90_000) {
System.out.println("读空闲超时: " + idleTime + "ms,关闭连接: "
+ ctx.channel().id());
ctx.close();
} else {
// 60-90秒之间,发送Ping探测
System.out.println("发送Ping探测: " + ctx.channel().id());
ctx.writeAndFlush(new PingMessage().toByteBuffer());
}
}
}
}
}
// ============ Netty HTTP/2服务器(gRPC基础) ============
/**
* HTTP/2相比HTTP/1.1的核心改进:
* 1. 多路复用:一个TCP连接上并行多个Stream(解决队头阻塞)
* 2. Header压缩(HPACK):大幅减少Header传输量
* 3. 服务端推送:服务端主动推送资源
* 4. 流控制:每条Stream独立流量控制
*/
// Netty HTTP/2服务(基于Netty自己的HTTP/2实现,非Tomcat)
class Http2Server {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
SslContext sslCtx = ...; // TLS
ServerBootstrap bootstrap = new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new Http2ServerInitializer(sslCtx));
bootstrap.bind(8443).sync().channel().closeFuture().sync();
}
}
class Http2ServerInitializer extends ApplicationProtocolNegotiationHandler {
Http2ServerInitializer(SslContext sslCtx) {
super(ApplicationProtocolNames.HTTP_2);
}
@Override
protected void initChannel(SocketChannel ch) {
if (isHttp2(ch.pipeline().get(SslHandler.class))) {
configureHttp2(ch.pipeline());
} else {
configureHttp1(ch.pipeline());
}
}
private void configureHttp2(ChannelPipeline p) {
// HTTP/2先处理帧,再分派给Stream
p.addLast(new Http2FrameCodecBuilder.forServer().build());
p.addLast(new Http2MultiplexHandler(new BizHandler()));
}
}