一、线程池核心参数与队列选择

1.1 七大核心参数

// ThreadPoolExecutor 构造函数
public ThreadPoolExecutor(
    int corePoolSize,              // 核心线程数(始终保持存活,除非allowCoreThreadTimeOut)
    int maximumPoolSize,           // 最大线程数
    long keepAliveTime,            // 空闲线程存活时间(超过corePoolSize的线程)
    TimeUnit unit,                 // keepAliveTime时间单位
    BlockingQueue workQueue,  // 任务队列(阻塞队列)
    ThreadFactory threadFactory,   // 线程工厂(自定义线程名/优先级/是否为守护线程)
    RejectedExecutionHandler handler // 拒绝策略(队列满+线程数满时的处理)
)

// 线程池状态机(ctl = 运行状态 + 线程池worker数量)
// ctl高3位:RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED
// ctl低29位:worker计数

// 线程创建规则(核心算法):
// 1. workerCount < corePoolSize → 创建新核心线程执行任务
// 2. workerCount >= corePoolSize → 任务加入队列
// 3. 队列满 && workerCount < maximumPoolSize → 创建新线程
// 4. 队列满 && workerCount >= maximumPoolSize → 触发拒绝策略

// ⚠️ 关键陷阱:为什么不应该用 Executors 创建线程池
// ❌ newFixedThreadPool: 无界队列(Integer.MAX_VALUE),OOM风险
// ❌ newCachedThreadPool: maximumPoolSize=Integer.MAX_VALUE,线程无限创建
// ❌ newSingleThreadExecutor: 单线程 + 无界队列
// ✅ 正确方式:手动 new ThreadPoolExecutor,合理设置参数

1.2 队列选择策略

队列类型特性适用场景
LinkedBlockingQueue有界/无界,FIFO,可选容量通用,需设置capacity
ArrayBlockingQueue有界,FIFO,需设容量,性能高于Linked有严格并发限制
SynchronousQueue不存储元素,每个put必须等一个takeCallerRuns策略配合
PriorityBlockingQueue按优先级排序,无界但可设初始容量任务有优先级
DelayedWorkQueue按延迟排序(ScheduledThreadPool内部使用)定时任务

1.3 拒绝策略详解

// Java 6+ 四种内置策略 + 一种扩展
// 1. AbortPolicy(默认):抛RejectedExecutionException
// 2. DiscardPolicy:静默丢弃(最新提交的任务)
// 3. DiscardOldestPolicy:丢弃队列中最老的任务
// 4. CallerRunsPolicy:由调用线程执行(executor之外执行,有降级保护)
// 5. 自定义:实现RejectedExecutionHandler

// 最佳实践:自定义降级策略
public class CustomRejectedPolicy implements RejectedExecutionHandler {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Executor fallbackExecutor;

    public CustomRejectedPolicy(Executor fallbackExecutor) {
        this.fallbackExecutor = fallbackExecutor;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录告警
        log.warn("线程池{}饱和,执行降级策略", executor);
        // 降级:调用线程执行(不抛异常)
        if (!executor.isShutdown()) {
            fallbackExecutor.execute(r);
        }
    }
}

二、线程池参数计算公式

2.1 CPU密集型 vs IO密集型

// CPU密集型:线程数 = CPU核心数 + 1(超线程补偿)
// 计算公式:threads = N_CPU + 1
// 理由:CPU密集型任务几乎100%占用CPU,+1是为了利用等待IO时的线程调度空隙
int cpuCores = Runtime.getRuntime().availableProcessors();
int threads = cpuCores + 1; // 含超线程

// IO密集型:线程数 = CPU核心数 * 目标CPU利用率 * (1 + 平均等待时间/平均CPU时间)
// 公式:threads = N_CPU * U * (1 + W/C)
// 经验值:threads = N_CPU * 2  或  threads = N_CPU / (1 - 阻塞系数)
// 阻塞系数 = 阻塞时间 / (阻塞时间 + CPU时间),通常0.5~0.9
int threads = cpuCores * 2;  // 保守估计

// 通用经验公式(实测最优)
// CPU密集:N + 1
// IO密集:N * 2 + 1 或 N / (1 - 阻塞系数)
// 混合型:根据profiling调优

// Java代码实现
public class ThreadPoolFactory {
    private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();

    public static ExecutorService cpuBound() {
        return new ThreadPoolExecutor(
            CPU_CORES + 1,          // 核心线程
            CPU_CORES + 1,          // 最大线程(不扩容)
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder()
                .setNameFormat("cpu-pool-%d")
                .build(),
            new CallerRunsPolicy()
        );
    }

    public static ExecutorService ioBound() {
        return new ThreadPoolExecutor(
            CPU_CORES * 2,
            CPU_CORES * 4,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder()
                .setNameFormat("io-pool-%d")
                .build(),
            new CallerRunsPolicy()
        );
    }
}

2.2 Tomcat/Dubbo线程池参数调优

# application.yml 调优参考
spring:
  task:
    execution:
      pool:
        core-size: 8          # 核心线程(IO密集建议 2*N_CPU)
        max-size: 16         # 最大线程
        keep-alive: 60s      # 空闲存活时间
        queue-capacity: 100   # 有界队列(超出触发拒绝策略)
        allow-core-thread-timeout: true  # 允许核心线程超时回收

# Dubbo线程池配置
dubbo:
  provider:
    threads: 200           # 服务端业务线程池大小
    threadpool: fixed      # fixed/cached/limited/lazy
  consumer:
    threads: 100          # 客户端回调线程池大小

# 推荐配置思路:
# N = CPU核心数
# IO密集(REST/DB/远程调用):
#   core = N * 2, max = N * 4, queue = 1000
# CPU密集(计算/加密/压缩):
#   core = N + 1, max = N + 1, queue = 500

三、线程池监控与动态调参

3.1 线程池核心监控指标

// 线程池暴露JMX/Micrometer指标
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    8, 16, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new NamedThreadFactory("biz-pool")
);

// 核心监控方法
public class ThreadPoolMonitor {
    public static void printStats(ThreadPoolExecutor tp) {
        System.out.printf(
            "[%s] active=%d, pool=%d, queue=%d, completed=%d, taskCount=%d, largestPoolSize=%d%n",
            tp.getThreadFactory().toString(),
            tp.getActiveCount(),           // 当前活跃线程数
            tp.getPoolSize(),              // 当前线程池大小
            tp.getQueue().size(),         // 队列中的任务数
            tp.getCompletedTaskCount(),   // 已完成任务总数
            tp.getTaskCount(),            // 全部任务数
            tp.getLargestPoolSize()       // 历史最大线程数
        );
    }
}

// Micrometer + Prometheus 监控埋点
@Configuration
public class ThreadPoolMetricsConfig {
    @Bean
    public MeterBinder threadPoolMetrics(ThreadPoolExecutor executor, MeterRegistry registry) {
        return registry -> {
            Gauge.builder("threadpool.active", executor, ThreadPoolExecutor::getActiveCount)
                .tag("name", "biz-pool").register(registry);
            Gauge.builder("threadpool.queue", executor,
                e -> e.getQueue().size()).tag("name", "biz-pool").register(registry);
            Counter.builder("threadpool.rejected")
                .tag("name", "biz-pool")
                .register(registry).increment();
        };
    }
}

3.2 动态调整线程池参数

// ThreadPoolExecutor 支持运行时动态调整
ThreadPoolExecutor executor = ...;

// 动态设置核心线程数(立即创建新线程)
executor.setCorePoolSize(16);

// 动态设置最大线程数
executor.setMaximumPoolSize(32);

// 动态设置keepAliveTime
executor.setKeepAliveTime(30L, TimeUnit.SECONDS);

// 自动调节核心线程数的策略(allowCoreThreadTimeOut=true时生效)
// 每隔一定时间检查活跃线程,空闲则回收

// ✅ 最佳实践:运行时自适应线程池
public class AdaptiveThreadPool extends ThreadPoolExecutor {
    private volatile double targetCpuUtilization = 0.7;
    private final ScheduledExecutorService adjuster =
        Executors.newSingleThreadScheduledExecutor();

    public AdaptiveThreadPool(...) { super(...); startAdaptive(); }

    private void startAdaptive() {
        adjuster.scheduleAtFixedRate(() -> {
            int active = getActiveCount();
            int core = getCorePoolSize();
            int max = getMaximumPoolSize();

            // 如果活跃线程数 == 核心线程数,说明队列在堆积
            if (active == core && getQueue().size() > 500) {
                setMaximumPoolSize(Math.min(max + 4, 64)); // 扩容
                setCorePoolSize(Math.min(core + 2, 64));
            }

            // 如果活跃线程数远小于核心线程数,说明资源过剩
            if (active < core * 0.3) {
                setCorePoolSize(Math.max(core - 2, 1)); // 缩容
            }
        }, 1, 5, TimeUnit.MINUTES);
    }
}

3.3 生产环境压测调参流程

// 压测调参标准流程
// 1. 基线测试:线程数=CPU核心数,记录TPS和延迟
// 2. 逐步加压:线程数=2N, 4N, 8N...,观察吞吐量曲线
// 3. 确定拐点:当TPS不再上升或开始下降时,当前进数即为最优
// 4. 验证稳定性:最优线程数下持续压测30分钟

// JMH压测示例
@State(Scope.Thread)
@BenchmarkMode(Mode.Throughput)
public class ThreadPoolBenchmark {
    private ExecutorService pool;
    private volatile int result;

    @Setup
    public void setup() {
        pool = new ThreadPoolExecutor(
            8, 16, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new DiscardPolicy()
        );
    }

    @TearDown
    public void teardown() { pool.shutdown(); }

    @Benchmark
    @Threads(8)
    @Threads(16)
    @Threads(32)
    public void submitTasks() throws Exception {
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> { result = compute(i); });
        }
    }
}

// 观察指标:RejectedExecutionException频率、线程上下文切换次数
// vmstat 1 → cs列:上下文切换 > 20000/s 说明线程过多

四、常见线程池配置场景

// 场景1:快速响应的Web请求处理
// 特点:任务短小、并发量高、IO操作多
ExecutorService webPool = new ThreadPoolExecutor(
    50,    // CPU*2(N=16时)
    200,   // 最大200
    60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("web-pool-%d").build(),
    new CallerRunsPolicy() // 触发时由Servlet线程执行(背压)
);

// 场景2:数据库批量操作
// 特点:DB IO密集,批量提交,延迟容忍度高
ExecutorService dbPool = new ThreadPoolExecutor(
    8,    // CPU核心数
    16,   // 1倍CPU核心数
    5L, TimeUnit.MINUTES, // 长时间保持
    new LinkedBlockingQueue<>(500), // 有界队列
    new ThreadFactoryBuilder().setNameFormat("db-pool-%d").build(),
    new AbortPolicy()
);

// 场景3:异步日志写入
// 特点:容忍延迟、可靠性要求高
ExecutorService logPool = new DiscardingPAF ExecutorService(
    2,    // 固定2个线程即可
    2,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(50000), // 日志队列可较大
    new ThreadFactoryBuilder().setNameFormat("log-pool-%d").build(),
    new DiscardOldestPolicy()
);

// 场景4:定时任务调度(使用ScheduledThreadPoolExecutor)
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
    4, // 只用于调度,不处理业务,线程数=定时任务数
    new ScheduledThreadFactoryBuilder().setNameFormat("sched-%d").build()
);
// ⚠️ scheduleAtFixedRate: 任务开始时间间隔固定(忽略任务执行时长)
// ⚠️ scheduleWithFixedDelay: 任务结束时间间隔固定(通常用这个)