生产环境中使用线程池需要综合考虑资源管理、任务处理、错误恢复和监控等多个方面。以下是生产级别线程池的全面使用指南:
一、线程池创建最佳实践
1. 避免使用Executors快捷方法
- // 反模式 - 可能导致OOM
- ExecutorService unsafe = Executors.newCachedThreadPool(); // 无界线程池
- ExecutorService unsafe2 = Executors.newFixedThreadPool(10); // 无界队列
- // 正确方式 - 手动创建ThreadPoolExecutor
- int corePoolSize = Runtime.getRuntime().availableProcessors();
- int maxPoolSize = corePoolSize * 2;
- BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 有界队列
- RejectedExecutionHandler handler = new CustomRejectionPolicy();
- ExecutorService executor = new ThreadPoolExecutor(
- corePoolSize,
- maxPoolSize,
- 60L, TimeUnit.SECONDS,
- workQueue,
- new CustomThreadFactory("app-worker-"),
- handler
- );
复制代码 2. 关键配置参数
- corePoolSize:常驻核心线程数(根据业务类型调整)
- maximumPoolSize:最大线程数(建议不超过100)
- keepAliveTime:空闲线程存活时间(30-120秒)
- workQueue:必须使用有界队列(避免OOM)
- threadFactory:自定义线程工厂
- rejectedExecutionHandler:自定义拒绝策略
二、线程池关键组件实现
1. 自定义线程工厂(命名、异常处理)
- public class CustomThreadFactory implements ThreadFactory {
- private final AtomicInteger counter = new AtomicInteger(0);
- private final String namePrefix;
- private final ThreadGroup group;
- public CustomThreadFactory(String namePrefix) {
- this.namePrefix = namePrefix;
- SecurityManager s = System.getSecurityManager();
- this.group = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- }
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(group, r,
- namePrefix + counter.incrementAndGet(),
- 0);
- thread.setDaemon(false);
- thread.setPriority(Thread.NORM_PRIORITY);
-
- // 设置未捕获异常处理器
- thread.setUncaughtExceptionHandler((t, e) -> {
- logger.error("Uncaught exception in thread: " + t.getName(), e);
- // 发送告警通知
- AlertManager.notify(e);
- });
-
- return thread;
- }
- }
复制代码 2. 自定义拒绝策略(生产级)
- public class CustomRejectionPolicy implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- if (!executor.isShutdown()) {
- // 1. 记录被拒绝任务
- logger.warn("Task rejected: " + r.toString());
-
- // 2. 尝试重新放入队列(带超时)
- try {
- boolean offered = executor.getQueue().offer(r, 1, TimeUnit.SECONDS);
- if (!offered) {
- // 3. 持久化到存储系统
- persistTask(r);
- logger.info("Task persisted to storage: " + r);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Re-enqueue interrupted", e);
- }
- }
- }
-
- private void persistTask(Runnable task) {
- // 实现任务持久化逻辑(数据库、文件、消息队列)
- TaskStorage.save(task);
- }
- }
复制代码 三、任务提交与执行最佳实践
1. 任务封装(带监控)
- public class MonitoredTask implements Runnable {
- private final Runnable actualTask;
- private final long submitTime;
-
- public MonitoredTask(Runnable task) {
- this.actualTask = task;
- this.submitTime = System.currentTimeMillis();
- }
-
- @Override
- public void run() {
- long start = System.currentTimeMillis();
- try {
- // 设置MDC上下文(日志链路跟踪)
- MDC.put("traceId", UUID.randomUUID().toString());
-
- actualTask.run();
-
- long duration = System.currentTimeMillis() - start;
- Metrics.recordSuccess(duration);
- } catch (Exception e) {
- long duration = System.currentTimeMillis() - start;
- Metrics.recordFailure(duration);
-
- // 重试逻辑
- if (shouldRetry(e)) {
- retryTask();
- } else {
- logger.error("Task execution failed", e);
- }
- } finally {
- MDC.clear();
- }
- }
-
- // 提交任务时使用
- public static void submit(ExecutorService executor, Runnable task) {
- executor.execute(new MonitoredTask(task));
- }
- }
复制代码 2. 任务超时控制
- Future<?> future = executor.submit(task);
- try {
- // 设置任务超时时间
- future.get(30, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- // 1. 取消任务执行
- future.cancel(true);
-
- // 2. 记录超时日志
- logger.warn("Task timed out: " + task);
-
- // 3. 执行降级策略
- fallbackHandler.handle(task);
- } catch (Exception e) {
- // 处理其他异常
- }
复制代码 四、线程池监控与管理
1. 监控指标采集
- public class ThreadPoolMonitor implements Runnable {
- private final ThreadPoolExecutor executor;
-
- public ThreadPoolMonitor(ThreadPoolExecutor executor) {
- this.executor = executor;
- }
-
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- // 采集关键指标
- int activeCount = executor.getActiveCount();
- long completedTaskCount = executor.getCompletedTaskCount();
- int queueSize = executor.getQueue().size();
- int poolSize = executor.getPoolSize();
-
- // 发布到监控系统
- Metrics.gauge("threadpool.active.count", activeCount);
- Metrics.gauge("threadpool.queue.size", queueSize);
- Metrics.counter("threadpool.completed.tasks", completedTaskCount);
-
- // 检测潜在问题
- if (queueSize > executor.getQueue().remainingCapacity() * 0.8) {
- logger.warn("Thread pool queue is approaching capacity");
- }
-
- // 30秒采集一次
- Thread.sleep(30_000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
复制代码 2. 动态调整线程池参数
- public class DynamicThreadPool extends ThreadPoolExecutor {
-
- public DynamicThreadPool(int corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
- }
-
- // 动态修改核心线程数
- public void setCorePoolSize(int corePoolSize) {
- if (corePoolSize >= 0) {
- super.setCorePoolSize(corePoolSize);
- Metrics.gauge("threadpool.core.size", corePoolSize);
- }
- }
-
- // 动态修改最大线程数
- public void setMaxPoolSize(int maxPoolSize) {
- if (maxPoolSize > 0 && maxPoolSize >= getCorePoolSize()) {
- super.setMaximumPoolSize(maxPoolSize);
- Metrics.gauge("threadpool.max.size", maxPoolSize);
- }
- }
-
- // 动态修改队列容量(需要特殊处理)
- public void resizeQueue(int newCapacity) {
- BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity);
- BlockingQueue<Runnable> oldQueue = getQueue();
-
- // 转移任务
- synchronized (this) {
- List<Runnable> transferList = new ArrayList<>();
- oldQueue.drainTo(transferList);
- newQueue.addAll(transferList);
-
- // 更新队列
- super.setRejectedExecutionHandler(getRejectedExecutionHandler());
- super.setQueue(newQueue);
- }
- }
- }
复制代码 五、优雅关闭与资源清理
1. 应用关闭时处理
- @PreDestroy
- public void shutdownExecutor() {
- // 1. 禁止新任务提交
- executor.shutdown();
-
- try {
- // 2. 等待现有任务完成
- if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
- // 3. 取消所有未开始任务
- executor.shutdownNow();
-
- // 4. 再次等待任务响应取消
- if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
- logger.error("Thread pool did not terminate");
- }
- }
- } catch (InterruptedException e) {
- // 5. 中断当前线程并尝试取消
- executor.shutdownNow();
- Thread.currentThread().interrupt();
- }
-
- // 6. 清理资源
- cleanupResources();
- }
复制代码 2. 未完成任务恢复
- public void recoverPendingTasks() {
- BlockingQueue<Runnable> pendingQueue = executor.getQueue();
- List<Runnable> pendingTasks = new ArrayList<>();
- pendingQueue.drainTo(pendingTasks);
-
- for (Runnable task : pendingTasks) {
- if (task instanceof RecoverableTask) {
- // 持久化到可靠存储
- TaskStorage.save((RecoverableTask) task);
- logger.info("Recovered pending task: " + task);
- }
- }
- }
复制代码 六、生产环境建议
- 线程隔离策略:
- CPU密集型任务:独立线程池
- I/O密集型任务:独立线程池
- 关键业务:独立线程池(避免相互影响)
- 资源限制:
- // 使用Semaphore控制并发资源使用
- private final Semaphore concurrencySemaphore = new Semaphore(50);
- executor.execute(() -> {
- try {
- concurrencySemaphore.acquire();
- // 执行受限资源操作
- } finally {
- concurrencySemaphore.release();
- }
- });
复制代码 - 上下文传递:
- // 使用TransmittableThreadLocal传递上下文
- TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
- executor.execute(TtlRunnable.get(() -> {
- // 可以访问父线程的context值
- String value = context.get();
- }));
复制代码 - 熔断降级:
- CircuitBreaker circuitBreaker = new CircuitBreaker();
- executor.execute(() -> {
- if (circuitBreaker.allowExecution()) {
- try {
- // 执行业务逻辑
- } catch (Exception e) {
- circuitBreaker.recordFailure();
- }
- } else {
- // 执行降级逻辑
- fallbackService.executeFallback();
- }
- });
复制代码 七、常见问题处理方案
问题现象解决方案线程泄露线程数持续增长1. 检查线程是否正常结束
2. 添加线程创建监控
3. 限制最大线程数任务堆积队列持续增长1. 增加消费者线程
2. 优化任务处理速度
3. 实施任务降级CPU使用率高CPU持续满载1. 分析线程栈(jstack)
2. 优化热点代码
3. 限制线程池大小任务饿死低优先级任务长期得不到执行1. 使用优先级队列
2. 拆分不同优先级线程池
3. 实现公平调度上下文丢失子线程无法获取上下文1. 使用TransmittableThreadLocal
2. 手动传递上下文
3. 使用MDC框架生产环境中使用线程池需要综合考虑资源配置、任务管理、错误处理和监控告警等多个方面。建议结合具体业务场景选择合适的策略,并建立完善的监控和告警机制。
来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除 |