找回密码
 立即注册
首页 业界区 业界 生产级别线程池最佳实践

生产级别线程池最佳实践

科元料 2025-8-19 07:39:11
生产环境中使用线程池需要综合考虑资源管理、任务处理、错误恢复和监控等多个方面。以下是生产级别线程池的全面使用指南:
一、线程池创建最佳实践

1. 避免使用Executors快捷方法
  1. // 反模式 - 可能导致OOM
  2. ExecutorService unsafe = Executors.newCachedThreadPool(); // 无界线程池
  3. ExecutorService unsafe2 = Executors.newFixedThreadPool(10); // 无界队列
  4. // 正确方式 - 手动创建ThreadPoolExecutor
  5. int corePoolSize = Runtime.getRuntime().availableProcessors();
  6. int maxPoolSize = corePoolSize * 2;
  7. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 有界队列
  8. RejectedExecutionHandler handler = new CustomRejectionPolicy();
  9. ExecutorService executor = new ThreadPoolExecutor(
  10.     corePoolSize,
  11.     maxPoolSize,
  12.     60L, TimeUnit.SECONDS,
  13.     workQueue,
  14.     new CustomThreadFactory("app-worker-"),
  15.     handler
  16. );
复制代码
2. 关键配置参数


  • corePoolSize:常驻核心线程数(根据业务类型调整)
  • maximumPoolSize:最大线程数(建议不超过100)
  • keepAliveTime:空闲线程存活时间(30-120秒)
  • workQueue必须使用有界队列(避免OOM)
  • threadFactory:自定义线程工厂
  • rejectedExecutionHandler:自定义拒绝策略
二、线程池关键组件实现

1. 自定义线程工厂(命名、异常处理)
  1. public class CustomThreadFactory implements ThreadFactory {
  2.     private final AtomicInteger counter = new AtomicInteger(0);
  3.     private final String namePrefix;
  4.     private final ThreadGroup group;
  5.     public CustomThreadFactory(String namePrefix) {
  6.         this.namePrefix = namePrefix;
  7.         SecurityManager s = System.getSecurityManager();
  8.         this.group = (s != null) ? s.getThreadGroup() :
  9.                       Thread.currentThread().getThreadGroup();
  10.     }
  11.     @Override
  12.     public Thread newThread(Runnable r) {
  13.         Thread thread = new Thread(group, r,
  14.                                  namePrefix + counter.incrementAndGet(),
  15.                                  0);
  16.         thread.setDaemon(false);
  17.         thread.setPriority(Thread.NORM_PRIORITY);
  18.         
  19.         // 设置未捕获异常处理器
  20.         thread.setUncaughtExceptionHandler((t, e) -> {
  21.             logger.error("Uncaught exception in thread: " + t.getName(), e);
  22.             // 发送告警通知
  23.             AlertManager.notify(e);
  24.         });
  25.         
  26.         return thread;
  27.     }
  28. }
复制代码
2. 自定义拒绝策略(生产级)
  1. public class CustomRejectionPolicy implements RejectedExecutionHandler {
  2.     @Override
  3.     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  4.         if (!executor.isShutdown()) {
  5.             // 1. 记录被拒绝任务
  6.             logger.warn("Task rejected: " + r.toString());
  7.             
  8.             // 2. 尝试重新放入队列(带超时)
  9.             try {
  10.                 boolean offered = executor.getQueue().offer(r, 1, TimeUnit.SECONDS);
  11.                 if (!offered) {
  12.                     // 3. 持久化到存储系统
  13.                     persistTask(r);
  14.                     logger.info("Task persisted to storage: " + r);
  15.                 }
  16.             } catch (InterruptedException e) {
  17.                 Thread.currentThread().interrupt();
  18.                 logger.error("Re-enqueue interrupted", e);
  19.             }
  20.         }
  21.     }
  22.    
  23.     private void persistTask(Runnable task) {
  24.         // 实现任务持久化逻辑(数据库、文件、消息队列)
  25.         TaskStorage.save(task);
  26.     }
  27. }
复制代码
三、任务提交与执行最佳实践

1. 任务封装(带监控)
  1. public class MonitoredTask implements Runnable {
  2.     private final Runnable actualTask;
  3.     private final long submitTime;
  4.    
  5.     public MonitoredTask(Runnable task) {
  6.         this.actualTask = task;
  7.         this.submitTime = System.currentTimeMillis();
  8.     }
  9.    
  10.     @Override
  11.     public void run() {
  12.         long start = System.currentTimeMillis();
  13.         try {
  14.             // 设置MDC上下文(日志链路跟踪)
  15.             MDC.put("traceId", UUID.randomUUID().toString());
  16.             
  17.             actualTask.run();
  18.             
  19.             long duration = System.currentTimeMillis() - start;
  20.             Metrics.recordSuccess(duration);
  21.         } catch (Exception e) {
  22.             long duration = System.currentTimeMillis() - start;
  23.             Metrics.recordFailure(duration);
  24.             
  25.             // 重试逻辑
  26.             if (shouldRetry(e)) {
  27.                 retryTask();
  28.             } else {
  29.                 logger.error("Task execution failed", e);
  30.             }
  31.         } finally {
  32.             MDC.clear();
  33.         }
  34.     }
  35.    
  36.     // 提交任务时使用
  37.     public static void submit(ExecutorService executor, Runnable task) {
  38.         executor.execute(new MonitoredTask(task));
  39.     }
  40. }
复制代码
2. 任务超时控制
  1. Future<?> future = executor.submit(task);
  2. try {
  3.     // 设置任务超时时间
  4.     future.get(30, TimeUnit.SECONDS);
  5. } catch (TimeoutException e) {
  6.     // 1. 取消任务执行
  7.     future.cancel(true);
  8.    
  9.     // 2. 记录超时日志
  10.     logger.warn("Task timed out: " + task);
  11.    
  12.     // 3. 执行降级策略
  13.     fallbackHandler.handle(task);
  14. } catch (Exception e) {
  15.     // 处理其他异常
  16. }
复制代码
四、线程池监控与管理

1. 监控指标采集
  1. public class ThreadPoolMonitor implements Runnable {
  2.     private final ThreadPoolExecutor executor;
  3.    
  4.     public ThreadPoolMonitor(ThreadPoolExecutor executor) {
  5.         this.executor = executor;
  6.     }
  7.    
  8.     @Override
  9.     public void run() {
  10.         while (!Thread.currentThread().isInterrupted()) {
  11.             try {
  12.                 // 采集关键指标
  13.                 int activeCount = executor.getActiveCount();
  14.                 long completedTaskCount = executor.getCompletedTaskCount();
  15.                 int queueSize = executor.getQueue().size();
  16.                 int poolSize = executor.getPoolSize();
  17.                
  18.                 // 发布到监控系统
  19.                 Metrics.gauge("threadpool.active.count", activeCount);
  20.                 Metrics.gauge("threadpool.queue.size", queueSize);
  21.                 Metrics.counter("threadpool.completed.tasks", completedTaskCount);
  22.                
  23.                 // 检测潜在问题
  24.                 if (queueSize > executor.getQueue().remainingCapacity() * 0.8) {
  25.                     logger.warn("Thread pool queue is approaching capacity");
  26.                 }
  27.                
  28.                 // 30秒采集一次
  29.                 Thread.sleep(30_000);
  30.             } catch (InterruptedException e) {
  31.                 Thread.currentThread().interrupt();
  32.             }
  33.         }
  34.     }
  35. }
复制代码
2. 动态调整线程池参数
  1. public class DynamicThreadPool extends ThreadPoolExecutor {
  2.    
  3.     public DynamicThreadPool(int corePoolSize, int maxPoolSize,
  4.                            long keepAliveTime, TimeUnit unit,
  5.                            BlockingQueue<Runnable> workQueue) {
  6.         super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
  7.     }
  8.    
  9.     // 动态修改核心线程数
  10.     public void setCorePoolSize(int corePoolSize) {
  11.         if (corePoolSize >= 0) {
  12.             super.setCorePoolSize(corePoolSize);
  13.             Metrics.gauge("threadpool.core.size", corePoolSize);
  14.         }
  15.     }
  16.    
  17.     // 动态修改最大线程数
  18.     public void setMaxPoolSize(int maxPoolSize) {
  19.         if (maxPoolSize > 0 && maxPoolSize >= getCorePoolSize()) {
  20.             super.setMaximumPoolSize(maxPoolSize);
  21.             Metrics.gauge("threadpool.max.size", maxPoolSize);
  22.         }
  23.     }
  24.    
  25.     // 动态修改队列容量(需要特殊处理)
  26.     public void resizeQueue(int newCapacity) {
  27.         BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity);
  28.         BlockingQueue<Runnable> oldQueue = getQueue();
  29.         
  30.         // 转移任务
  31.         synchronized (this) {
  32.             List<Runnable> transferList = new ArrayList<>();
  33.             oldQueue.drainTo(transferList);
  34.             newQueue.addAll(transferList);
  35.             
  36.             // 更新队列
  37.             super.setRejectedExecutionHandler(getRejectedExecutionHandler());
  38.             super.setQueue(newQueue);
  39.         }
  40.     }
  41. }
复制代码
五、优雅关闭与资源清理

1. 应用关闭时处理
  1. @PreDestroy
  2. public void shutdownExecutor() {
  3.     // 1. 禁止新任务提交
  4.     executor.shutdown();
  5.    
  6.     try {
  7.         // 2. 等待现有任务完成
  8.         if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
  9.             // 3. 取消所有未开始任务
  10.             executor.shutdownNow();
  11.             
  12.             // 4. 再次等待任务响应取消
  13.             if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
  14.                 logger.error("Thread pool did not terminate");
  15.             }
  16.         }
  17.     } catch (InterruptedException e) {
  18.         // 5. 中断当前线程并尝试取消
  19.         executor.shutdownNow();
  20.         Thread.currentThread().interrupt();
  21.     }
  22.    
  23.     // 6. 清理资源
  24.     cleanupResources();
  25. }
复制代码
2. 未完成任务恢复
  1. public void recoverPendingTasks() {
  2.     BlockingQueue<Runnable> pendingQueue = executor.getQueue();
  3.     List<Runnable> pendingTasks = new ArrayList<>();
  4.     pendingQueue.drainTo(pendingTasks);
  5.    
  6.     for (Runnable task : pendingTasks) {
  7.         if (task instanceof RecoverableTask) {
  8.             // 持久化到可靠存储
  9.             TaskStorage.save((RecoverableTask) task);
  10.             logger.info("Recovered pending task: " + task);
  11.         }
  12.     }
  13. }
复制代码
六、生产环境建议


  • 线程隔离策略

    • CPU密集型任务:独立线程池
    • I/O密集型任务:独立线程池
    • 关键业务:独立线程池(避免相互影响)

  • 资源限制
    1. // 使用Semaphore控制并发资源使用
    2. private final Semaphore concurrencySemaphore = new Semaphore(50);
    3. executor.execute(() -> {
    4.     try {
    5.         concurrencySemaphore.acquire();
    6.         // 执行受限资源操作
    7.     } finally {
    8.         concurrencySemaphore.release();
    9.     }
    10. });
    复制代码
  • 上下文传递
    1. // 使用TransmittableThreadLocal传递上下文
    2. TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
    3. executor.execute(TtlRunnable.get(() -> {
    4.     // 可以访问父线程的context值
    5.     String value = context.get();
    6. }));
    复制代码
  • 熔断降级
    1. CircuitBreaker circuitBreaker = new CircuitBreaker();
    2. executor.execute(() -> {
    3.     if (circuitBreaker.allowExecution()) {
    4.         try {
    5.             // 执行业务逻辑
    6.         } catch (Exception e) {
    7.             circuitBreaker.recordFailure();
    8.         }
    9.     } else {
    10.         // 执行降级逻辑
    11.         fallbackService.executeFallback();
    12.     }
    13. });
    复制代码
七、常见问题处理方案

问题现象解决方案线程泄露线程数持续增长1. 检查线程是否正常结束
2. 添加线程创建监控
3. 限制最大线程数任务堆积队列持续增长1. 增加消费者线程
2. 优化任务处理速度
3. 实施任务降级CPU使用率高CPU持续满载1. 分析线程栈(jstack)
2. 优化热点代码
3. 限制线程池大小任务饿死低优先级任务长期得不到执行1. 使用优先级队列
2. 拆分不同优先级线程池
3. 实现公平调度上下文丢失子线程无法获取上下文1. 使用TransmittableThreadLocal
2. 手动传递上下文
3. 使用MDC框架生产环境中使用线程池需要综合考虑资源配置、任务管理、错误处理和监控告警等多个方面。建议结合具体业务场景选择合适的策略,并建立完善的监控和告警机制。

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除

相关推荐

您需要登录后才可以回帖 登录 | 立即注册