科元料 发表于 2025-8-19 07:39:11

生产级别线程池最佳实践

生产环境中使用线程池需要综合考虑资源管理、任务处理、错误恢复和监控等多个方面。以下是生产级别线程池的全面使用指南:
一、线程池创建最佳实践

1. 避免使用Executors快捷方法

// 反模式 - 可能导致OOM
ExecutorService unsafe = Executors.newCachedThreadPool(); // 无界线程池
ExecutorService unsafe2 = Executors.newFixedThreadPool(10); // 无界队列

// 正确方式 - 手动创建ThreadPoolExecutor
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 有界队列
RejectedExecutionHandler handler = new CustomRejectionPolicy();

ExecutorService executor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    60L, TimeUnit.SECONDS,
    workQueue,
    new CustomThreadFactory("app-worker-"),
    handler
);2. 关键配置参数


[*]corePoolSize:常驻核心线程数(根据业务类型调整)
[*]maximumPoolSize:最大线程数(建议不超过100)
[*]keepAliveTime:空闲线程存活时间(30-120秒)
[*]workQueue:必须使用有界队列(避免OOM)
[*]threadFactory:自定义线程工厂
[*]rejectedExecutionHandler:自定义拒绝策略
二、线程池关键组件实现

1. 自定义线程工厂(命名、异常处理)

public class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger counter = new AtomicInteger(0);
    private final String namePrefix;
    private final ThreadGroup group;

    public CustomThreadFactory(String namePrefix) {
      this.namePrefix = namePrefix;
      SecurityManager s = System.getSecurityManager();
      this.group = (s != null) ? s.getThreadGroup() :
                      Thread.currentThread().getThreadGroup();
    }

    @Override
    public Thread newThread(Runnable r) {
      Thread thread = new Thread(group, r,
                                 namePrefix + counter.incrementAndGet(),
                                 0);
      thread.setDaemon(false);
      thread.setPriority(Thread.NORM_PRIORITY);
      
      // 设置未捕获异常处理器
      thread.setUncaughtExceptionHandler((t, e) -> {
            logger.error("Uncaught exception in thread: " + t.getName(), e);
            // 发送告警通知
            AlertManager.notify(e);
      });
      
      return thread;
    }
}2. 自定义拒绝策略(生产级)

public class CustomRejectionPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      if (!executor.isShutdown()) {
            // 1. 记录被拒绝任务
            logger.warn("Task rejected: " + r.toString());
            
            // 2. 尝试重新放入队列(带超时)
            try {
                boolean offered = executor.getQueue().offer(r, 1, TimeUnit.SECONDS);
                if (!offered) {
                  // 3. 持久化到存储系统
                  persistTask(r);
                  logger.info("Task persisted to storage: " + r);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("Re-enqueue interrupted", e);
            }
      }
    }
   
    private void persistTask(Runnable task) {
      // 实现任务持久化逻辑(数据库、文件、消息队列)
      TaskStorage.save(task);
    }
}三、任务提交与执行最佳实践

1. 任务封装(带监控)

public class MonitoredTask implements Runnable {
    private final Runnable actualTask;
    private final long submitTime;
   
    public MonitoredTask(Runnable task) {
      this.actualTask = task;
      this.submitTime = System.currentTimeMillis();
    }
   
    @Override
    public void run() {
      long start = System.currentTimeMillis();
      try {
            // 设置MDC上下文(日志链路跟踪)
            MDC.put("traceId", UUID.randomUUID().toString());
            
            actualTask.run();
            
            long duration = System.currentTimeMillis() - start;
            Metrics.recordSuccess(duration);
      } catch (Exception e) {
            long duration = System.currentTimeMillis() - start;
            Metrics.recordFailure(duration);
            
            // 重试逻辑
            if (shouldRetry(e)) {
                retryTask();
            } else {
                logger.error("Task execution failed", e);
            }
      } finally {
            MDC.clear();
      }
    }
   
    // 提交任务时使用
    public static void submit(ExecutorService executor, Runnable task) {
      executor.execute(new MonitoredTask(task));
    }
}2. 任务超时控制

Future<?> future = executor.submit(task);

try {
    // 设置任务超时时间
    future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    // 1. 取消任务执行
    future.cancel(true);
   
    // 2. 记录超时日志
    logger.warn("Task timed out: " + task);
   
    // 3. 执行降级策略
    fallbackHandler.handle(task);
} catch (Exception e) {
    // 处理其他异常
}四、线程池监控与管理

1. 监控指标采集

public class ThreadPoolMonitor implements Runnable {
    private final ThreadPoolExecutor executor;
   
    public ThreadPoolMonitor(ThreadPoolExecutor executor) {
      this.executor = executor;
    }
   
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
            try {
                // 采集关键指标
                int activeCount = executor.getActiveCount();
                long completedTaskCount = executor.getCompletedTaskCount();
                int queueSize = executor.getQueue().size();
                int poolSize = executor.getPoolSize();
               
                // 发布到监控系统
                Metrics.gauge("threadpool.active.count", activeCount);
                Metrics.gauge("threadpool.queue.size", queueSize);
                Metrics.counter("threadpool.completed.tasks", completedTaskCount);
               
                // 检测潜在问题
                if (queueSize > executor.getQueue().remainingCapacity() * 0.8) {
                  logger.warn("Thread pool queue is approaching capacity");
                }
               
                // 30秒采集一次
                Thread.sleep(30_000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
      }
    }
}2. 动态调整线程池参数

public class DynamicThreadPool extends ThreadPoolExecutor {
   
    public DynamicThreadPool(int corePoolSize, int maxPoolSize,
                           long keepAliveTime, TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
      super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
    }
   
    // 动态修改核心线程数
    public void setCorePoolSize(int corePoolSize) {
      if (corePoolSize >= 0) {
            super.setCorePoolSize(corePoolSize);
            Metrics.gauge("threadpool.core.size", corePoolSize);
      }
    }
   
    // 动态修改最大线程数
    public void setMaxPoolSize(int maxPoolSize) {
      if (maxPoolSize > 0 && maxPoolSize >= getCorePoolSize()) {
            super.setMaximumPoolSize(maxPoolSize);
            Metrics.gauge("threadpool.max.size", maxPoolSize);
      }
    }
   
    // 动态修改队列容量(需要特殊处理)
    public void resizeQueue(int newCapacity) {
      BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity);
      BlockingQueue<Runnable> oldQueue = getQueue();
      
      // 转移任务
      synchronized (this) {
            List<Runnable> transferList = new ArrayList<>();
            oldQueue.drainTo(transferList);
            newQueue.addAll(transferList);
            
            // 更新队列
            super.setRejectedExecutionHandler(getRejectedExecutionHandler());
            super.setQueue(newQueue);
      }
    }
}五、优雅关闭与资源清理

1. 应用关闭时处理

@PreDestroy
public void shutdownExecutor() {
    // 1. 禁止新任务提交
    executor.shutdown();
   
    try {
      // 2. 等待现有任务完成
      if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            // 3. 取消所有未开始任务
            executor.shutdownNow();
            
            // 4. 再次等待任务响应取消
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                logger.error("Thread pool did not terminate");
            }
      }
    } catch (InterruptedException e) {
      // 5. 中断当前线程并尝试取消
      executor.shutdownNow();
      Thread.currentThread().interrupt();
    }
   
    // 6. 清理资源
    cleanupResources();
}2. 未完成任务恢复

public void recoverPendingTasks() {
    BlockingQueue<Runnable> pendingQueue = executor.getQueue();
    List<Runnable> pendingTasks = new ArrayList<>();
    pendingQueue.drainTo(pendingTasks);
   
    for (Runnable task : pendingTasks) {
      if (task instanceof RecoverableTask) {
            // 持久化到可靠存储
            TaskStorage.save((RecoverableTask) task);
            logger.info("Recovered pending task: " + task);
      }
    }
}六、生产环境建议


[*]线程隔离策略:

[*]CPU密集型任务:独立线程池
[*]I/O密集型任务:独立线程池
[*]关键业务:独立线程池(避免相互影响)

[*]资源限制:
// 使用Semaphore控制并发资源使用
private final Semaphore concurrencySemaphore = new Semaphore(50);

executor.execute(() -> {
    try {
      concurrencySemaphore.acquire();
      // 执行受限资源操作
    } finally {
      concurrencySemaphore.release();
    }
});
[*]上下文传递:
// 使用TransmittableThreadLocal传递上下文
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

executor.execute(TtlRunnable.get(() -> {
    // 可以访问父线程的context值
    String value = context.get();
}));
[*]熔断降级:
CircuitBreaker circuitBreaker = new CircuitBreaker();

executor.execute(() -> {
    if (circuitBreaker.allowExecution()) {
      try {
            // 执行业务逻辑
      } catch (Exception e) {
            circuitBreaker.recordFailure();
      }
    } else {
      // 执行降级逻辑
      fallbackService.executeFallback();
    }
});
七、常见问题处理方案

问题现象解决方案线程泄露线程数持续增长1. 检查线程是否正常结束
2. 添加线程创建监控
3. 限制最大线程数任务堆积队列持续增长1. 增加消费者线程
2. 优化任务处理速度
3. 实施任务降级CPU使用率高CPU持续满载1. 分析线程栈(jstack)
2. 优化热点代码
3. 限制线程池大小任务饿死低优先级任务长期得不到执行1. 使用优先级队列
2. 拆分不同优先级线程池
3. 实现公平调度上下文丢失子线程无法获取上下文1. 使用TransmittableThreadLocal
2. 手动传递上下文
3. 使用MDC框架生产环境中使用线程池需要综合考虑资源配置、任务管理、错误处理和监控告警等多个方面。建议结合具体业务场景选择合适的策略,并建立完善的监控和告警机制。

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除
页: [1]
查看完整版本: 生产级别线程池最佳实践