一、I/O模型的演进:从BIO到AIO

理解Java网络编程的核心脉络,是理解Netty为什么这么设计的起点。从BIO到NIO,再到Epoll(AIO),每一步演进都是为了解决前一代模型的根本缺陷。

1.1 BIO:每个连接一个线程的死亡陷阱

Blocking I/O(BIO)是Java最早的网络编程模型。核心问题:accept()和read()都是阻塞的——一个线程在等待数据时无法服务其他连接。

/**
 * BIO的致命缺陷:
 * 
 * 线程模型:Thread-Per-Connection
 * - ServerSocket.accept() 阻塞:等待客户端连接
 * - Socket.read() 阻塞:等待数据到达
 * 
 * 问题1:线程资源昂贵
 * - Java线程默认栈大小约1MB(-Xss)
 * - 1万个并发连接 = 1万个线程 = ~10GB栈内存
 * - 线程切换成本高(Context Switch,Linux约5-10微秒)
 * - 线程调度本身成为瓶颈
 * 
 * 问题2:阻塞等待是浪费
 * - 99%的时间线程都在等待数据(连接闲置)
 * - CPU空转,无法服务其他连接
 * - 线程大部分时间在sleep,不是work
 */

// BIO Server实现(问题代码,仅用于演示缺陷)
public class BioServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(8080);
        System.out.println("BIO Server启动,监听8080端口...");
        
        // 问题:每个连接一个线程!
        while (true) {
            Socket client = server.accept(); // 阻塞①:等待连接
            System.out.println("新连接: " + client.getRemoteSocketAddress());
            
            // 每个连接都new一个线程(内存和调度灾难)
            new Thread(() -> {
                try {
                    // 阻塞②:等待数据
                    // 线程在此处沉睡,浪费所有资源
                    BufferedReader reader = new BufferedReader(
                        new InputStreamReader(client.getInputStream()));
                    String line;
                    while ((line = reader.readLine()) != null) {
                        // 业务处理...
                        System.out.println("收到: " + line);
                        client.getOutputStream().write(("响应: " + line + "\n").getBytes());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try { client.close(); } catch (IOException ex) {}
                }
            }).start();
        }
    }
}

// 实际场景中,BIO线程模型的问题:
// 连接数1000:勉强可用
// 连接数10000:线程数10000,OOM
// 连接数100000:根本不可能
// 瓶颈不在业务逻辑,在线程模型

1.2 NIO:Selector打开事件驱动的大门

New I/O(NIO,Java 1.4引入)是革命性的改变。核心思想:用单线程(或少量线程)管理大量连接,通过Selector检测哪些连接有I/O事件需要处理

/**
 * NIO三大核心组件:
 * 
 * 1. Channel(通道):类似流,但可以非阻塞
 *    - FileChannel:文件I/O
 *    - SocketChannel:TCP客户端/主动连接
 *    - ServerSocketChannel:TCP服务端(接收连接)
 *    - DatagramChannel:UDP
 * 
 * 2. Buffer(缓冲区):读写数据的容器
 *    - 堆内缓冲区:byte[],GC压力大
 *    - 直接缓冲区:堆外内存,不受GC管理,需要手动释放
 * 
 * 3. Selector(选择器):检测多个Channel的I/O事件
 *    - 一个Selector管理多个Channel
 *    - select()阻塞直到有Channel就绪(OP_ACCEPT/OP_CONNECT/OP_READ/OP_WRITE)
 *    - selectNow()非阻塞,立即返回
 */

// NIO Selector的工作原理(Linux epoll的Java封装)
/**
 * Linux内核层面:
 * epoll_create()  → 创建epoll实例
 * epoll_ctl()     → 注册FD到epoll实例(EPOLL_CTL_ADD)
 * epoll_wait()    → 等待事件(水平触发LT 或 边缘触发ET)
 * 
 * Java NIO Selector 就是对 epoll 的封装(JDK实现,非JNI)
 * 
 * 关键性能指标:
 * - select():O(n),遍历所有FD
 * - poll():O(n),使用链表替代数组
 * - epoll():O(1),只返回就绪的FD(红黑树 + 就绪链表)
 * 
 * epoll的优势:
 * - FD数量不影响效率(不遍历全部)
 * - 使用回调而非轮询(内核就绪通知)
 * - 支持边缘触发(减少系统调用次数)
 */

// NIO Server核心代码
public class NioServer {
    public static void main(String[] args) throws IOException {
        // 1. 打开Selector(非阻塞I/O的核心)
        Selector selector = Selector.open();
        
        // 2. 打开ServerSocketChannel,配置为非阻塞
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false); // 关键:非阻塞模式
        serverChannel.bind(new InetSocketAddress(8080));
        
        // 3. 将ServerSocketChannel注册到Selector,关心ACCEPT事件
        SelectionKey acceptKey = serverChannel.register(
            selector, SelectionKey.OP_ACCEPT);
        System.out.println("NIO Server启动,监听8080...");
        
        // 4. 事件循环(Reactor模式核心)
        while (true) {
            // 阻塞直到有Channel就绪(可设置超时)
            // 这里就是epoll_wait()的Java层调用
            int readyCount = selector.select(); 
            // 返回值 = 就绪事件数量(0表示超时)
            
            if (readyCount == 0) continue;
            
            // 遍历所有就绪的SelectionKey
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove(); // 重要:从selectedKeys中移除,否则会重复处理
                
                // 判断是什么事件就绪
                if (key.isAcceptable()) {
                    // OP_ACCEPT就绪:有新连接
                    handleAccept(key);
                } else if (key.isReadable()) {
                    // OP_READ就绪:有数据可读
                    handleRead(key);
                } else if (key.isWritable()) {
                    // OP_WRITE就绪:可以写数据(通常不需要注册)
                    handleWrite(key);
                }
            }
        }
    }
    
    private static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 接收连接,但不阻塞
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        // 注册读事件(数据到达时通知)
        client.register(key.selector(), SelectionKey.OP_READ);
        System.out.println("新连接: " + client.getRemoteAddress());
    }
    
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = client.read(buffer); // 非阻塞!无数据时立即返回0
        if (read > 0) {
            buffer.flip(); // 切换模式:写→读
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            System.out.println("收到: " + new String(data));
            // 注册写事件,准备响应
            client.register(key.selector(), SelectionKey.OP_WRITE);
        } else if (read == -1) {
            // 客户端关闭
            client.close();
        }
    }
    
    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        String response = "HTTP/1.1 200 OK\r\n\r\nHello NIO!";
        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
        client.write(buffer); // 非阻塞写
        client.close();
    }
}

1.3 多路复用:AIO的最终形态

Java 7引入了AIO(Asynchronous I/O,异步I/O),通过Future+CompletionHandler实现真正的异步。但实际上,AIO在Linux上底层也是epoll,性能并不比NIO有显著优势,且编程模型复杂,工业界使用不多。

二、NIO核心API:Channel、Buffer与Selector

2.1 ByteBuffer的正确使用方式

ByteBuffer是NIO中最常用的类,但大部分人对它的理解停留在表面。最核心的概念是position、limit、capacity三指针模型

/**
 * ByteBuffer内部结构(重要!):
 * 
 * Java中一个Buffer对象有三个指针(索引):
 * - position(位置):下一个要读/写的位置。初始为0。
 * - limit(限制):第一个不能读/写的元素位置。初始等于capacity。
 * - capacity(容量):缓冲区的总大小,创建时固定。
 * 
 * 两种模式(通过flip()切换):
 * 
 * 写模式(写入channel → buffer):
 *   position: 0 → N(写入的数据量)
 *   limit: capacity(仍然指向末尾)
 * 
 * flip()(切换到读模式):
 *   limit = position(设置为写入的数据量边界)
 *   position = 0(从头开始读)
 * 
 * 读模式(从buffer → 读取/输出):
 *   position: 0 → N(已读)
 *   limit: N(还没读的边界)
 * 
 * compact()(半自动:压缩已读空间,保留未读数据,继续写):
 *   buffer.compact() = flip() + mark() reset() 组合
 *   未读数据移到开头,position指向未读数据末尾,继续写
 */

// ByteBuffer使用常见错误与正确做法
public class ByteBufferDemo {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        
        // ❌ 错误:忘记flip,直接从写模式读取
        buffer.put("hello".getBytes());  // position=5, limit=10
        // byte b = buffer.get();         // 读不到正确数据!
        
        // ✅ 正确做法1:flip(从头读)
        buffer.flip();                    // position=0, limit=5
        System.out.println("可读字节数: " + buffer.remaining()); // 5
        while (buffer.hasRemaining()) {
            System.out.print((char) buffer.get()); // hello
        }
        
        // ✅ 正确做法2:compact(边读边写,不丢数据)
        buffer.clear();                   // position=0, limit=capacity, 覆盖
        buffer.put("world".getBytes());   // position=5
        buffer.flip();                    // position=0, limit=5
        // 如果只读了3字节,compact后未读的"lo"会保留
        buffer.get(); buffer.get(); buffer.get(); // 读了"hel"
        buffer.compact();                 // 未读"lo"移到开头,position=2,继续写
        
        // ✅ 直接缓冲区和堆缓冲区
        // 堆缓冲区:ByteBuffer.allocate() — heap memory, GC回收
        ByteBuffer heapBuffer = ByteBuffer.allocate(1024);
        
        // 直接缓冲区:ByteBuffer.allocateDirect() — 堆外内存,OS直接访问
        // 优点:避免了JVM堆到内核的复制(零拷贝!)
        // 缺点:创建/销毁成本高,需要手动close(不是GC自动管理)
        ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024 * 1024);
        // 实际使用:文件传输、Socket高性能场景
        
        // 检查缓冲区类型
        if (directBuffer.isDirect()) {
            System.out.println("这是直接缓冲区(堆外内存)");
        }
        
        // 手动释放直接缓冲区(JDK 17+)
        // ((DirectBuffer)directBuffer).cleaner().clean();
    }
}

/**
 * Buffer使用规范(Netty的ByteBuf做了大量改进):
 * 
 * 1. 每write.flip().read().clear()一组操作
 * 2. 直接缓冲区用完必须释放
 * 3. 避免在循环内allocate(开销大),使用池化
 * 4. 多线程不共享Buffer(Buffer非线程安全)
 */

2.2 直接内存与零拷贝

直接缓冲区(堆外内存)是理解Netty高性能的关键。Linux的I/O模型中,数据从磁盘到应用程序需要经历多次拷贝:

/**
 * 传统I/O的数据拷贝路径(4次拷贝):
 * 
 * 磁盘 → 内核缓冲区(PageCache) → 用户缓冲区(JVM堆) → Socket缓冲区 → 网卡
 *         ①  DMA拷贝          ② CPU拷贝           ③ CPU拷贝
 * 
 * 关键问题:
 * - ② 和 ③ 是CPU拷贝,在JVM堆和内核之间来回搬运数据
 * - JVM GC会移动堆内对象,导致内核缓冲区地址失效
 * - 解决方案:内核→用户用DirectBuffer(零拷贝)
 * 
 * 使用直接缓冲区后的路径(3次拷贝):
 * 
 * 磁盘 → 内核缓冲区(PageCache) → 直接缓冲区(堆外) → 网卡
 *         ①  DMA拷贝             ② CPU拷贝(OS内存区域,非JVM堆)
 * 
 * 优势:
 * - PageCache缓存:热点文件数据驻留内核内存,减少磁盘I/O
 * - 直接缓冲区:JVM和内核共享同一块内存区域,无需额外复制
 * - Linux sendfile():内核→网卡零拷贝(彻底绕过用户空间)
 * 
 * Netty的零拷贝:CompositeByteBuf组合多个缓冲区逻辑为一个
 *              避免了多个小Buffer合并时的大量CPU拷贝
 */

// 直接内存分配的成本
public class DirectBufferCost {
    public static void main(String[] args) {
        long start, end;
        int iterations = 1000;
        
        // 堆缓冲区分配(快,但有GC压力)
        start = System.nanoTime();
        for (int i = 0; i < iterations; i++) {
            ByteBuffer heap = ByteBuffer.allocate(4096);
        }
        end = System.nanoTime();
        System.out.println("堆缓冲区分配: " + (end - start) / 1_000_000 + " ms");
        
        // 直接缓冲区分配(慢,但零拷贝)
        start = System.nanoTime();
        for (int i = 0; i < iterations; i++) {
            ByteBuffer direct = ByteBuffer.allocateDirect(4096);
            // 手动释放(非GC自动管理!)
            ((DirectBuffer) direct).cleaner().clean();
        }
        end = System.nanoTime();
        System.out.println("直接缓冲区分配: " + (end - start) / 1_000_000 + " ms");
        
        // 结论:直接缓冲区分配约慢5-10倍
        // 解决方案:池化(Netty的PooledByteBufAllocator就是池化+直接内存)
    }
}

2.3 epoll的两种触发模式

/**
 * epoll两种触发模式:
 * 
 * 水平触发(LT, Level Triggered,默认):
 * - 只要FD处于就绪状态,每次epoll_wait都会返回
 * - 即使上次返回后没有处理,下次epoll_wait继续返回
 * - 类似于GPIO的"电平有效"
 * - 优点:编程简单,不会漏事件
 * - 缺点:频繁epoll_wait调用
 * 
 * 边缘触发(ET, Edge Triggered):
 * - 只在FD状态变化(从不可读→可读)时返回一次
 * - 第一次返回后必须把buffer读完,否则下次epoll_wait不返回
 * - 类似于GPIO的"上升沿有效"
 * - 优点:减少epoll_wait调用次数,高性能
 * - 缺点:编程复杂,必须非阻塞读取到EAGAIN
 * 
 * Selector在Linux上默认使用LT模式(Selector.open()等价于epoll_create + LT)
 * 设置为ET需要额外配置(不同JDK实现方式不同)
 */

// Selector wakeup机制(关键!)
/**
 * 问题:selector.select() 阻塞在 epoll_wait()
 *       另一个线程想关闭连接或插入新任务怎么办?
 * 
 * 解决:Selector.wakeup()
 *       - wakeup()向管道写一个字节,selector立即从select()返回
 *       - 之后调用select()的线程继续正常工作
 * 
 * 典型场景:Netty中的BossGroup线程调用wakeup()让WorkerGroup立即响应
 */

/**
 * wakeup()的工作原理(Linux实现):
 * 
 * JDK创建Selector时,同时创建一个eventfd(2.6.22+)或pipe(老版本)
 * - wakeup()向eventfd写入一个字节(自增计数器)
 * - epoll_wait()检测到eventfd就绪,立即返回
 * 
 * Netty用这个机制实现:
 * Boss线程注册连接 → 唤醒Boss线程处理accept
 * TaskQueue插入任务 → 唤醒工作线程处理任务
 */

// Netty源码中的wakeup模式(简化版)
// bossLoop.execute(() -> {
//     // 注册新的连接
//     serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//     // 通知selector有新事件
//     selector.wakeup();
// });

// 或在关闭时
// selector.wakeup();
// selector.close();

三、Netty线程模型:Reactor模式的工业实现

Netty是Reactor模式最成熟的Java实现。理解Netty的线程模型,是写出高性能Netty应用的必备前提。

3.1 三种Reactor线程模型

/**
 * Reactor模型的核心组件:
 * 
 * 1. Reactor(反应器):监听和分发I/O事件
 * 2. Handler(处理器):执行业务逻辑
 * 
 * 三种变体:
 * 
 * 单线程Reactor:
 *   - 只有一个线程处理所有accept/read/write
 *   - 缺陷:业务处理阻塞会导致整个服务瘫痪
 *   - 适用:小规模学习,生产环境不推荐
 * 
 * 多线程Reactor(常用):
 *   - BossGroup:1个或N个线程,只处理accept
 *   - WorkerGroup:1个或N个线程,处理read/write/业务
 *   - 分离了连接管理和I/O处理
 *   - 适用:大多数生产场景
 * 
 * 主从多线程Reactor(Netty推荐):
 *   - MainReactorGroup:处理accept(通常1线程)
 *   - SubReactorGroup:处理read/write(N线程)
 *   - 业务处理器:线程池(可自定义)
 *   - 优点:各司其职,扩展性最好
 *   - 适用:超大规模并发(百万连接)
 */

// Netty线程模型图示
/**
 *                    ┌──────────────────────────────────┐
 *                    │         BossGroup (NioEventLoop) │
 *                    │   监听ServerSocketChannel ACCEPT │
 *                    └──────┬───────────────────┬────────┘
 *                           │ accept           │
 *                           ▼                   ▼
 *               ┌─────────────────────┐   (多ServerSocketChannel)
 *               │   WorkerGroup        │
 *               │ (NioEventLoopGroup) │
 *               │                     │
 *               │  [EL1] [EL2]...[ELn]│  ← 每个EventLoop持有一个Selector
 *               │   │                   │    处理N个连接的就绪事件
 *               │   ├─ Sock1 read/write │
 *               │   ├─ Sock2 read/write │
 *               │   └─ Sock3 read/write │
 *               └──────────────┬──────────┘
 *                              │
 *                              ▼
 *               ┌──────────────────────────┐
 *               │   Handler处理链(ChannelPipeline)
 *               │  ① Codec:解码/编码
 *               │  ② 业务Handler:业务逻辑
 *               │  ③ 自定义Handler
 *               └──────────────────────────┘
 *                              │
 *                              ▼ (如需进一步解耦)
 *               ┌──────────────────────────┐
 *               │   业务线程池(业务CPU密集/阻塞时用)│
 *               │   通过 ctx.channel().eventLoop() │
 *               │   .execute() 提交到业务线程池 │
 *               └──────────────────────────┘
 */

3.2 Netty Echo Server实现

// ============ Netty Echo Server ============
/**
 * Echo Server:收到什么就返回什么
 * 这是Netty最经典的入门示例
 */
public class NettyEchoServer {
    public static void main(String[] args) throws InterruptedException {
        // BossGroup:处理accept事件,通常1-2个线程足够
        // 连接管理不需要太多线程(accept是轻量操作)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        
        // WorkerGroup:处理read/write/业务处理,通常 = CPU核数*2
        // I/O事件处理需要较多线程
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(
            Runtime.getRuntime().availableProcessors() * 2);
        
        try {
            // ServerBootstrap:Netty服务启动辅助类(Builder模式)
            ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)                    // 设置线程组
                .channel(NioServerSocketChannel.class)            // NIO Channel类型
                // .option() 配置ServerSocketChannel参数(boss线程)
                .option(ChannelOption.SO_BACKLOG, 1024)           // 连接队列长度
                .option(ChannelOption.SO_KEEPALIVE, true)        // TCP保活
                // .childOption() 配置SocketChannel参数(worker线程)
                .childOption(ChannelOption.TCP_NODELAY, true)    // 禁用Nagle,小包立即发送
                .childOption(ChannelOption.SO_SNDBUF, 64 * 1024) // 发送缓冲区64KB
                .childOption(ChannelOption.SO_RCVBUF, 64 * 1024) // 接收缓冲区64KB
                // .attr() 设置Channel属性(可自定义)
                // .handler() 设置boss线程的Handler(ServerSocketChannel专属)
                // .childHandler() 设置worker线程的Handler(SocketChannel专属)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        // ChannelPipeline:责任链模式,每个Handler处理特定逻辑
                        // inbound按顺序执行,outbound按逆序执行
                        ChannelPipeline pipeline = ch.pipeline();
                        
                        // Decoder:将ByteBuf解码为String(换行符分割)
                        pipeline.addLast(new LineBasedFrameDecoder(1024));
                        pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
                        
                        // Encoder:将String编码为ByteBuf
                        pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
                        
                        // 业务Handler:Echo逻辑
                        pipeline.addLast(new EchoServerHandler());
                    }
                });
            
            // 绑定端口,同步等待启动完成
            ChannelFuture future = bootstrap.bind(8080).sync();
            System.out.println("Netty Echo Server启动,监听8080...");
            
            // 优雅关闭:等待服务器 socket 关闭
            future.channel().closeFuture().sync();
        } finally {
            // 释放资源:关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

/**
 * EchoHandler:ChannelInboundHandlerAdapter的子类
 * 
 * ChannelInboundHandler方法调用时机:
 * 1. channelRegistered → channelActive → channelRead (数据到达)
 * 2. channelReadComplete → channelInactive → channelUnregistered (连接关闭)
 * 
 * ChannelOutboundHandler方法:
 * bind/listen, connect, accept, read, write, flush, close, deregister
 */
@Sharable  // 可被多个Channel共享(无状态Handler)
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
    private static final Logger log = LoggerFactory.getLogger(EchoServerHandler.class);
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // msg是解码后的String(因为Pipeline中有StringDecoder)
        String request = (String) msg;
        log.info("收到: {}", request);
        
        // 写回响应(写操作通过ChannelOutboundHandler逆序传播)
        // ctx.write() 只是写入发送缓冲区(不立即发)
        // ctx.flush() 才真正发送到网络
        ctx.write(request);   // write + flush = writeAndFlush
        ctx.flush();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 处理异常:通常关闭连接
        log.error("连接异常: {}", ctx.channel().remoteAddress(), cause);
        ctx.close();
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("连接建立: {}", ctx.channel().remoteAddress());
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("连接断开: {}", ctx.channel().remoteAddress());
    }
}

// ============ 客户端代码 ============
public class NettyEchoClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        
        try {
            Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline()
                          .addLast(new LineBasedFrameDecoder(1024))
                          .addLast(new StringDecoder(StandardCharsets.UTF_8))
                          .addLast(new StringEncoder(StandardCharsets.UTF_8))
                          .addLast(new EchoClientHandler());
                    }
                });
            
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            Scanner scanner = new Scanner(System.in);
            
            while (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                channel.writeAndFlush(line + "\n"); // 带换行符
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}

class EchoClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println("服务端响应: " + msg);
    }
}

四、ByteBuf:Netty的零拷贝缓冲区

ByteBuf是Netty自研的缓冲区,相比JDK的ByteBuffer有显著改进:池化、引用计数、动态扩展、双指针独立读写。

4.1 ByteBuf vs ByteBuffer核心对比

/**
 * ByteBuf 的核心改进:
 * 
 * 1. 读写指针分离(readerIndex / writerIndex)
 *    - ByteBuffer只有position一个指针,读写共用
 *    - ByteBuf有两个独立指针,写操作不改变readerIndex
 *    - 不需要flip()切换模式
 * 
 * 2. 池化(Pooled vs Unpooled)
 *    - Unpooled:每次分配新内存,GC压力大
 *    - Pooled:从内存池中获取已分配的缓冲区,零分配
 *    - Netty默认使用PooledByteBufAllocator
 * 
 * 3. 动态扩展
 *    - ByteBuffer创建时固定capacity
 *    - ByteBuf可自动扩展(writableBytes不够时自动扩容)
 *    - writeBytes() 超出容量时自动扩容
 * 
 * 4. 引用计数
 *    - 每个ByteBuf有引用计数(referenceCount)
 *    - retain() +1,release() -1
 *    - 计数为0时释放内存(不是GC!)
 *    - 用于零拷贝:将Buffer"转移"而非"复制"
 *    - 不调用release() = 内存泄漏(NIO泄漏检测可发现)
 * 
 * 5. 组合缓冲区(CompositeByteBuf)
 *    - 逻辑上组合多个ByteBuf为一个
 *    - 避免复制和合并的开销(零拷贝的核心)
 */

// ByteBuf使用示例
public class ByteBufDemo {
    public static void main(String[] args) {
        // 池化直接缓冲区(Netty推荐配置)
        // 池化减少GC压力,直接内存提升I/O性能
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(256);
        
        // 写入数据(writerIndex自动移动)
        buf.writeBytes("Hello Netty".getBytes());
        System.out.println("可读字节: " + buf.readableBytes()); // 11
        
        // 读取数据(readerIndex自动移动)
        byte b = buf.readByte();
        System.out.println("读到的字节: " + (char) b); // H
        
        // 不需要flip!直接继续写
        buf.writeInt(12345);
        
        // 获取字节数组(零拷贝?不,复制)
        byte[] arr = new byte[buf.readableBytes()];
        buf.readBytes(arr);
        
        // 如果要避免复制,使用NioUnsafe直接获取底层数组
        // ByteBuf内部是.nioBuffer() 返回共享视图
        
        // release! 引用计数-1,为0时释放内存
        // 这是Netty内存管理的核心规则
        buf.release();
        
        // 组合缓冲区(零拷贝合并)
        CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
        ByteBuf header = ByteBufAllocator.DEFAULT.buffer().writeBytes("Header".getBytes());
        ByteBuf body = ByteBufAllocator.DEFAULT.buffer().writeBytes("Body".getBytes());
        // 不复制,直接逻辑组合
        composite.addComponents(header, body);
        // 发送时:header + body 连续内存,但实际是两个ByteBuf的引用
        // 零拷贝场景:文件传输,不需要将文件内容复制到用户空间
        composite.release();
    }
}

4.2 ByteBuf泄漏检测

/**
 * ByteBuf泄漏是Netty最常见的内存问题
 * 
 * 原因:ByteBuf由引用计数管理,不是GC自动回收
 *       忘记调用release() → 内存泄漏
 * 
 * 检测级别(通过-Dio.netty.leakDetection.level设置):
 * 
 * disabled:不检测(生产环境默认,性能最好)
 * simple:抽样1%的ByteBuf,JVM退出时报告(测试环境)
 * advanced:跟踪泄漏ByteBuf的创建位置(开发环境)
 * paranoid:每次创建都检测,每次泄漏立即报告(调试)
 * 
 * 泄漏检测原理:WeakReference + ReferenceQueue
 * 创建ByteBuf时注册到跟踪队列,GC回收时检测是否调用了release()
 */

// 泄漏检测的典型输出
/**
 * LEAK: ByteBuf.release() was not called before it's garbage-collected.
 * Recent access records: 
 * Print 10 records above.
 * 
 * 创建位置堆栈(泄漏发生的位置):
 * - MyHandler.channelRead (MyHandler.java:42)
 * - ByteBufTest.main (ByteBufTest.java:10)
 * ...
 * 
 * 修复方法:
 * 1. try-finally:finally块中release
 * 2. ReferenceCountUtil.releaseLater(msg):自动release
 * 3. SimpleChannelInboundHandler:自动release收到的消息
 */

// 正确的Handler写法
public class SafeHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 方式1:try-finally(标准做法)
        ByteBuf buf = null;
        try {
            buf = (ByteBuf) msg;
            // 业务处理...
            ctx.fireChannelRead(buf); // 转发给下一个Handler(注意:不要release)
        } finally {
            if (buf != null && buf.refCnt() > 0) {
                buf.release(); // 引用计数-1
            }
        }
        
        // 方式2:SimpleChannelInboundHandler(推荐)
        // 自动对msg进行release,但转发时需要retain()
    }
}

// 方式2的实现
public class BetterHandler extends SimpleChannelInboundHandler<ByteBuf> {
    // SimpleChannelInboundHandler会自动release msg
    // 但如果需要将msg传给下一个Handler,必须retain()
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        // msg会被自动release,retain后传给下一个Handler
        ctx.fireChannelRead(msg.retain());
    }
}

五、Netty WebSocket服务器与心跳

// ============ Netty WebSocket服务器 ============
/**
 * WebSocket协议:
 * HTTP握手建立连接 → 升级为全双工WebSocket
 * 适用于:聊天、实时推送、游戏、协作编辑
 */
public class WebSocketServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(
            Runtime.getRuntime().availableProcessors() * 2);
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WebSocketServerInitializer());
            
            ChannelFuture future = bootstrap.bind(8080).sync();
            System.out.println("WebSocket Server启动: ws://localhost:8080/ws");
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        // HTTP编解码(WebSocket握手需要)
        pipeline.addLast(new HttpServerCodec());
        // 聚合HTTP请求/响应(FullHttpRequest/FullHttpResponse)
        pipeline.addLast(new HttpObjectAggregator(65536));
        // WebSocket握手Handler(升级协议)
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // 自定义Handler
        pipeline.addLast(new WebSocketFrameHandler());
    }
}

class WebSocketFrameHandler extends TextWebSocketFrameHandler {
    private static final Logger log = LoggerFactory.getLogger(WebSocketFrameHandler.class);
    
    // 维护所有在线连接(用于广播)
    private static final ChannelGroup channels = new DefaultChannelGroup(
        GlobalEventExecutor.INSTANCE); // 全局唯一的事件执行器
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        channels.add(ctx.channel()); // 加入广播组
        log.info("客户端连接: {}", ctx.channel().id());
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        channels.remove(ctx.channel()); // 从广播组移除
        log.info("客户端断开: {}", ctx.channel().id());
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        String text = frame.text();
        log.info("收到消息: {}", text);
        
        // 广播给所有连接
        // TextWebSocketFrame:WebSocket文本帧
        // BinaryWebSocketFrame:WebSocket二进制帧
        // PingWebSocketFrame / PongWebSocketFrame:心跳帧
        channels.writeAndFlush(new TextWebSocketFrame("广播: " + text));
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("异常", cause);
        ctx.close();
    }
}

// ============ WebSocket 心跳机制 ============
/**
 * 心跳的两个目的:
 * 1. 检测连接是否存活(对端崩溃/网络断开)
 * 2. 保持TCP连接活跃(防止中间设备断开空闲连接)
 * 
 * WebSocket心跳:Ping-Pong帧
 * 主动方发送PingFrame,被动方必须响应PongFrame
 */

// 心跳Handler实现
@Sharable
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    
    // 空闲检测:读空闲超过阈值,发送Ping
    // ReadIdleTimeoutEvent → 发送PingFrame → 等待PongFrame
    // 连续N次没收到Pong → 判定为断开,关闭连接
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent idle) {
            switch (idle.state()) {
                case READER_IDLE:
                    // 读空闲(服务端没收到客户端消息太久)
                    // 发送Ping探测客户端是否存活
                    ctx.writeAndFlush(new PingWebSocketFrame());
                    System.out.println("发送Ping探测连接: " + ctx.channel().id());
                    break;
                case WRITER_IDLE:
                    // 写空闲(服务端很久没发消息给客户端)
                    // 可以发送心跳消息
                    break;
                case ALL_IDLE:
                    // 读写都空闲
                    break;
            }
        }
        // 事件需要手动传播,否则后续Handler收不到
        ctx.fireUserEventTriggered(evt);
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof PingWebSocketFrame) {
            // 收到Ping,响应Pong
            ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
            System.out.println("响应Pong");
        } else if (frame instanceof PongWebSocketFrame) {
            // 收到Pong,确认对端存活
            System.out.println("收到Pong,连接存活");
        }
    }
}
Netty最佳实践:Pipeline中的Handler尽量设计为@Sharable(无状态),减少对象创建。有状态Handler通过ChannelHandlerContext的attr()存储连接级别状态,避免内存泄漏。

六、Netty高性能调优参数汇总

/**
 * Netty性能调优三维度:
 * 
 * 维度1:内存配置
 * 维度2:I/O线程数
 * 维度3:TCP参数
 * 
 * 生产环境调优原则:
 * - 先压测,拿到数据再调(不要凭感觉)
 * - 单参数调整,每次只改一个
 * - 关注瓶颈:CPU、内存、网络、GC
 */

// ============ 1. ByteBuf池化与内存配置 ============
// VM启动参数
// -Dio.netty.allocator.type=pooled        // 使用池化分配器(默认)
// -Dio.netty.allocator.numDirectArenas=4   // 直接内存池数量=CPU核数
// -Dio.netty.allocator.pageSize=8192       // 内存页大小
// -Dio.netty.allocator.maxOrder=11         // 最大阶数(chunk大小=pageSize*2^maxOrder)
// -Dio.netty.noPreferDirect=true           // 优先堆缓冲区而非直接缓冲区

// JVM堆外内存配置(Netty直接缓冲区使用的内存)
// -XX:MaxDirectMemorySize=4G              // 直接内存最大4G
// 注意:Netty直接内存不受-Xmx控制,需单独配置

// 动态配置(代码中)
System.setProperty("io.netty.allocator.numDirectArenas", 
    String.valueOf(Runtime.getRuntime().availableProcessors()));

// ============ 2. I/O线程数配置 ============
// Boss线程数:通常1即可(accept是轻量的)
// 特殊场景:SO_REUSEPORT(多个进程绑定同一端口)时,可设>1
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);

// Worker线程数:CPU密集型=核数+1~2,I/O密集型=核数*2
// 计算公式:
// WorkerThreads = CoreNum * (1 + (WaitTime / WorkTime))
// 极度I/O密集(大量DB/远程调用):CoreNum * 2
// 混合负载(编解码+业务):CoreNum
NioEventLoopGroup workerGroup = new NioEventLoopGroup(
    Math.min(Runtime.getRuntime().availableProcessors() * 2, 32));

// ============ 3. TCP参数优化 ============
ServerBootstrap bootstrap = new ServerBootstrap()
    // TCP连接队列:三次握手在队列中等待accept()处理的数量
    // 默认50,高并发时需调大,否则客户端connect()会报ConnectionRefused
    .option(ChannelOption.SO_BACKLOG, 4096)
    
    // SO_REUSEADDR: TIME_WAIT状态的端口可立即重用
    .option(ChannelOption.SO_REUSEADDR, true)
    
    // TCP_NODELAY:禁用Nagle算法,小包立即发送
    // 默认false(启用Nagle),适合低延迟场景
    // 关闭:适合实时游戏、交易系统
    // 开启:适合批量传输、文件传输
    .childOption(ChannelOption.TCP_NODELAY, true)
    
    // SO_KEEPALIVE:TCP保活,检测对端崩溃/网络断开
    // 默认false,开启后2小时无数据触发探测
    // 建议:长连接应用开启
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    
    // SO_LINGER:close()时的行为
    // =0:立即关闭,不发送FIN
    // =-1(默认):等待缓冲区数据发送完成
    // >0:等待超时后强制关闭
    .childOption(ChannelOption.SO_LINGER, 0)
    
    // SO_SNDBUF / SO_RCVBUF:发送/接收缓冲区大小
    // 默认:系统决定(通常64KB)
    // 调整:带宽高、延迟大的场景可增大
    .childOption(ChannelOption.SO_SNDBUF, 256 * 1024) // 256KB
    .childOption(ChannelOption.SO_RCVBUF, 256 * 1024);

// ============ 4. 高并发连接数优化 ============
// 文件描述符限制(ulimit -n)
// 每条TCP连接占用1个FD
// 默认1024不够,需要调到100000+
// /etc/security/limits.conf
// * soft nofile 1048576
// * hard nofile 1048576

// JVM参数:事件循环线程数上限(Netty默认无上限)
// -Dio.netty.eventLoop.maxPendingTasks=1024
// 超过此值的任务会被拒绝

// ============ 5. GC优化 ============
// CMS/G1/Shenandoah/ZGC选择建议
// ZGC(JDK 11+):停顿时间<1ms,适合大内存(>8GB)场景
// G1(JDK 9+):默认,平衡型
// Netty内部使用了大量DirectBuffer和ByteBuf
// 池化+直接内存使得GC压力小很多

// 推荐JVM参数
// java -server \
//   -Xms8G -Xmx8G \
//   -XX:+UseZGC \
//   -XX:MaxDirectMemorySize=4G \
//   -XX:+AlwaysPreTouch \
//   ...

七、Netty长连接心跳实战

// ============ Netty长连接心跳最佳实践 ============
/**
 * 长连接心跳场景:
 * - 移动App与服务端保持连接(推送、IM)
 * - 游戏客户端保持连接
 * - IoT设备上报数据
 * 
 * 心跳策略:
 * - 客户端定时发送Ping(通常30-60秒一次)
 * - 服务端检测读空闲,超过阈值(2-3倍心跳间隔)关闭连接
 * - 双向心跳:客户端Ping→服务端Pong,服务端Ping→客户端Pong
 */

// Pipeline配置(Server端完整版)
class LongConnectionServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        
        // 空闲检测Handler(必须放在最前面)
        // IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime)
        // 触发 IdleStateEvent,传递给后续Handler
        p.addLast(new IdleStateHandler(60, 0, 0)); // 60秒读空闲
        
        // 编解码
        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufDecoder(UserMessage.getDefaultInstance()));
        p.addLast(new ProtobufEncoder());
        
        // 心跳处理(接收Ping,响应Pong)
        p.addLast(new HeartbeatHandler());
        
        // 业务处理
        p.addLast(new BusinessHandler());
    }
}

@Sharable
class HeartbeatHandler extends ChannelDuplexHandler {
    // 记录最后收到消息的时间
    private volatile long lastReadTime = System.currentTimeMillis();
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lastReadTime = System.currentTimeMillis();
        super.channelRead(ctx, msg); // 继续传递给业务Handler
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idle = (IdleStateEvent) evt;
            
            if (idle.state() == IdleState.READER_IDLE) {
                long idleTime = System.currentTimeMillis() - lastReadTime;
                // 超过90秒没收到消息,关闭连接
                if (idleTime > 90_000) {
                    System.out.println("读空闲超时: " + idleTime + "ms,关闭连接: " 
                        + ctx.channel().id());
                    ctx.close();
                } else {
                    // 60-90秒之间,发送Ping探测
                    System.out.println("发送Ping探测: " + ctx.channel().id());
                    ctx.writeAndFlush(new PingMessage().toByteBuffer());
                }
            }
        }
    }
}

// ============ Netty HTTP/2服务器(gRPC基础) ============
/**
 * HTTP/2相比HTTP/1.1的核心改进:
 * 1. 多路复用:一个TCP连接上并行多个Stream(解决队头阻塞)
 * 2. Header压缩(HPACK):大幅减少Header传输量
 * 3. 服务端推送:服务端主动推送资源
 * 4. 流控制:每条Stream独立流量控制
 */

// Netty HTTP/2服务(基于Netty自己的HTTP/2实现,非Tomcat)
class Http2Server {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        SslContext sslCtx = ...; // TLS
        
        ServerBootstrap bootstrap = new ServerBootstrap()
            .group(group)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new Http2ServerInitializer(sslCtx));
        
        bootstrap.bind(8443).sync().channel().closeFuture().sync();
    }
}

class Http2ServerInitializer extends ApplicationProtocolNegotiationHandler {
    Http2ServerInitializer(SslContext sslCtx) {
        super(ApplicationProtocolNames.HTTP_2);
    }
    
    @Override
    protected void initChannel(SocketChannel ch) {
        if (isHttp2(ch.pipeline().get(SslHandler.class))) {
            configureHttp2(ch.pipeline());
        } else {
            configureHttp1(ch.pipeline());
        }
    }
    
    private void configureHttp2(ChannelPipeline p) {
        // HTTP/2先处理帧,再分派给Stream
        p.addLast(new Http2FrameCodecBuilder.forServer().build());
        p.addLast(new Http2MultiplexHandler(new BizHandler()));
    }
}