这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:
- 整体的超时控制
- 超时、异常处理和封装
- 取消未完成的任务
核心代码- public class TaskDispatcher<T> {
- private final CompletionService<T> completionService;
- /**
- * 待处理任务
- */
- private final Set<Future<T>> pending = Sets.newHashSet();
- /**
- * 超时时间, 单位: s
- */
- private long timeout = 10000;
- public TaskDispatcher(Executor executor, long timeout) {
- completionService = new ExecutorCompletionService<>(executor);
- if (timeout > 0) {
- this.timeout = timeout;
- }
- }
- public void submit(Callable<T> task) {
- Future<T> future = completionService.submit(task);
- pending.add(future);
- }
- /**
- * 仅获取执行的任务结果
- *
- * @param ignoreException 忽略执行时发生的异常
- * @return
- */
- public List<T> taskCompletedResult(boolean ignoreException) {
- List<TaskResult<T>> taskResultList = taskCompleted();
- List<T> res = Lists.newArrayList();
- if (CollectionUtils.isEmpty(taskResultList)) {
- return res;
- }
- boolean hasError = false;
- for (TaskResult<T> taskResult : taskResultList) {
- if (!taskResult.isTimeout() && taskResult.getError() == null) {
- res.add(taskResult.getValue());
- } else if (taskResult.isTimeout() && !ignoreException) {
- LoggerUtils.error("执行任务时超时");
- hasError = true;
- } else if (taskResult.getError() != null && !ignoreException) {
- LoggerUtils.error("执行任务时发生异常", taskResult.getError());
- hasError = true;
- }
- }
- if (hasError) {
- throw new ZHException("任务并发处理时发生异常");
- }
- return res;
- }
- /**
- * 获取执行的任务
- *
- * @return
- */
- public List<TaskResult<T>> taskCompleted() {
- long deadline = System.currentTimeMillis() + timeout;
- List<TaskResult<T>> results = Lists.newArrayList();
- int totalTasks = pending.size();
- try {
- for (int i = 0; i < totalTasks; i++) {
- long remaining = Math.max(0, deadline - System.currentTimeMillis());
- Future<T> future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
- TaskResult<T> result = new TaskResult<>();
- if (future == null) {
- result.setTimeout(true);
- } else {
- pending.remove(future);
- try {
- result.setValue(future.get());
- } catch (ExecutionException e) {
- result.setError(e.getCause());
- }
- }
- results.add(result);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("任务结果收集中断", e);
- } finally {
- pending.forEach(f -> f.cancel(true));
- pending.clear();
- }
- return results;
- }
- @Data
- static class TaskResult<T> {
- private T value;
- private Throwable error;
- private boolean isTimeout;
- }
- }
复制代码 需要自己声明线程池bean,使用方式如下- TaskDispatcher<Integer> taskDispatcher = new TaskDispatcher<Integer>(threadExecutor, TIME_OUT);
- for (long index: indexList) {
- taskDispatcher.submit(() -> xxxService.count(index));
- }
复制代码 为了便于在计数求和场景使用,进一步实现了一个子类- public class IntSumTaskDispatcher extends TaskDispatcher<Integer> {
- public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
- super(executor, timeout);
- }
- /**
- * 对所有结果求和
- *
- * @return
- */
- public int takeCompletedSum() {
- List<Integer> countResList = taskCompletedResult(true);
- int count = 0;
- for (Integer countSingle : countResList) {
- if (countSingle == null) {
- continue;
- }
- count += countSingle;
- }
- return count;
- }
- }
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |