找回密码
 立即注册
首页 业界区 业界 封装CompletionService的并发任务分发器(优化版) ...

封装CompletionService的并发任务分发器(优化版)

届表 2025-6-13 15:52:09
这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:

  • 整体的超时控制
  • 超时、异常处理和封装
  • 取消未完成的任务
核心代码
  1. public class TaskDispatcher<T> {
  2.     private final CompletionService<T> completionService;
  3.     /**
  4.      * 待处理任务
  5.      */
  6.     private final Set<Future<T>> pending = Sets.newHashSet();
  7.     /**
  8.      * 超时时间, 单位: s
  9.      */
  10.     private long timeout = 10000;
  11.     public TaskDispatcher(Executor executor, long timeout) {
  12.         completionService = new ExecutorCompletionService<>(executor);
  13.         if (timeout > 0) {
  14.             this.timeout = timeout;
  15.         }
  16.     }
  17.     public void submit(Callable<T> task) {
  18.         Future<T> future = completionService.submit(task);
  19.         pending.add(future);
  20.     }
  21.     /**
  22.      * 仅获取执行的任务结果
  23.      *
  24.      * @param ignoreException 忽略执行时发生的异常
  25.      * @return
  26.      */
  27.     public List<T> taskCompletedResult(boolean ignoreException) {
  28.         List<TaskResult<T>> taskResultList = taskCompleted();
  29.         List<T> res = Lists.newArrayList();
  30.         if (CollectionUtils.isEmpty(taskResultList)) {
  31.             return res;
  32.         }
  33.         boolean hasError = false;
  34.         for (TaskResult<T> taskResult : taskResultList) {
  35.             if (!taskResult.isTimeout() && taskResult.getError() == null) {
  36.                 res.add(taskResult.getValue());
  37.             } else if (taskResult.isTimeout() && !ignoreException) {
  38.                 LoggerUtils.error("执行任务时超时");
  39.                 hasError = true;
  40.             } else if (taskResult.getError() != null && !ignoreException) {
  41.                 LoggerUtils.error("执行任务时发生异常", taskResult.getError());
  42.                 hasError = true;
  43.             }
  44.         }
  45.         if (hasError) {
  46.             throw new ZHException("任务并发处理时发生异常");
  47.         }
  48.         return res;
  49.     }
  50.     /**
  51.      * 获取执行的任务
  52.      *
  53.      * @return
  54.      */
  55.     public List<TaskResult<T>> taskCompleted() {
  56.         long deadline = System.currentTimeMillis() + timeout;
  57.         List<TaskResult<T>> results = Lists.newArrayList();
  58.         int totalTasks = pending.size();
  59.         try {
  60.             for (int i = 0; i < totalTasks; i++) {
  61.                 long remaining = Math.max(0, deadline - System.currentTimeMillis());
  62.                 Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
  63.                 TaskResult<T> result = new TaskResult<>();
  64.                 if (future == null) {
  65.                     result.setTimeout(true);
  66.                 } else {
  67.                     pending.remove(future);
  68.                     try {
  69.                         result.setValue(future.get());
  70.                     } catch (ExecutionException e) {
  71.                         result.setError(e.getCause());
  72.                     }
  73.                 }
  74.                 results.add(result);
  75.             }
  76.         } catch (InterruptedException e) {
  77.             Thread.currentThread().interrupt();
  78.             throw new RuntimeException("任务结果收集中断", e);
  79.         } finally {
  80.             pending.forEach(f -> f.cancel(true));
  81.             pending.clear();
  82.         }
  83.         return results;
  84.     }
  85.     @Data
  86.     static class TaskResult<T> {
  87.         private T value;
  88.         private Throwable error;
  89.         private boolean isTimeout;
  90.     }
  91. }
复制代码
需要自己声明线程池bean,使用方式如下
  1.         TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
  2.         for (long index: indexList) {
  3.             taskDispatcher.submit(() -> xxxService.count(index));
  4.         }
复制代码
为了便于在计数求和场景使用,进一步实现了一个子类
  1. public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
  2.     public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
  3.         super(executor, timeout);
  4.     }
  5.     /**
  6.      * 对所有结果求和
  7.      *
  8.      * @return
  9.      */
  10.     public int takeCompletedSum() {
  11.         List<Integer> countResList = taskCompletedResult(true);
  12.         int count = 0;
  13.         for (Integer countSingle : countResList) {
  14.             if (countSingle == null) {
  15.                 continue;
  16.             }
  17.             count += countSingle;
  18.         }
  19.         return count;
  20.     }
  21. }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册