java—执行后如何访问可调用future的参数?

zzoitvuj  于 2021-07-09  发布在  Java
关注(0)|答案(3)|浏览(389)

我使用 Future 以及 Callable<> . 问题:如果 Exception 在异步执行期间发生,我需要访问用于生成可调用的。但是怎么做呢?
例子:

List<Callable<Response> tasks = new ArrayList<>();
taks.add(() -> sendRequest(req));

futures = executor.invokeAll(tasks);

for (future : futures) {
    try {
        Response rsp = future.get();
    } catch (ExecutionException e) {
        //problem: I need to access req.getType() value of the req Object here. But How?
    }
}

比如:我想从 req.getType() ,所以我知道哪些异步请求失败了。并在错误消息中返回给用户。

hs1ihplo

hs1ihplo1#

记住:那 ExecutionException 您正在捕获由 Callable 实施。
因此,如果您想在该异常中获得详细信息,那么您必须确保抛出并 Package 的异常包含所需的信息。
或者,您必须实际跟踪哪个未来属于哪个“输入参数”。就像创造一个 Map<Future, Request> 在将可调用对象推入执行器时填充的。

nhaq1z21

nhaq1z212#

虽然这个问题已经得到了彻底的回答,但我想提出一些其他的想法,也许可以说明一些已经讨论过的想法。

关于结果顺序和阻塞

提交一组要在不同线程中运行的任务的一个有趣的方面是,我们无法再控制这些任务的完成顺序。有些任务可能会先运行,甚至那些先运行的任务可能需要比其他任务更长的时间才能完成。
现在考虑一下:

List<Future<Integer>> futures = new ArrayList<>();
futures.add(executor.submit(() -> doTaskWith(100));
futures.add(executor.submit(() -> doTaskWith(200));

for(future: futures) {
   future.get(); //Uh oh, blocking here!
}

在上面的例子中,如果我们的第一个未来需要30秒来完成,而第二个未来只需要15秒;然后我们实际上封锁了30秒开始处理结果,但是第一个结果是在15秒之前完成的。
关键是,如果我们能在结果一出来就开始汇总,我们就可以改进。

使用executionservice

处理此类问题的一种方法是使用executorcompletionservice提交任务并控制结果。
例如:

ExecutorService executor = Executors.newFixedThreadPool(3);
ExecutorCompletionService<Double> completionService = new ExecutorCompletionService<>(executor);

completionService.submit(() -> doTaskWith(100));
completionService.submit(() -> doTaskWith(200));

Future<Double> future = completionService.take(); //whichever finishes first

这个 ExecutorCompletionService 工作方式类似于队列,一旦任务完成,无论该任务是什么,我们都可以轮询它或将它从队列中取出并获得结果。

建议解决方案1:使用未来Map

假设我们有一个函数来计算整数的平方根:

static double sqrt(int n) {
    if(n > 0) {
        return Math.sqrt(n);
    }
    throw new IllegalArgumentException("Invalid integer: " + n);
}

现在假设我们要提交一组整数来计算它们的平方根。
例如:

Map<Future<Double>, Integer> tasks = Stream.of(4, 25, 36, 49, -5)
      .collect(toMap(n -> completionService.submit(() -> sqrt(n)),
      Function.identity()));

因此,我们从流中获取每个整数,并创建一个 Callable<Integer> 他们中的每一个(即。 () -> sqrt(n) ). 每个可调用函数都会得到上下文整数的平方根 n . 然后我们将这个可调用的消息传递给我们的完成服务,并从中获得一个未来(即。 completionService.submit(callable) ). 然后我们把它还给你 future 我们Map的键,还有整数 n Map中该键的值,这是我们在返回对象中看到的值(即。 Map<Future<Double>, Integer> tasks )
现在我们有了一组未来(来自我们提交的任务)作为键,而用来获取这些未来的整数作为值。
现在我们可以做:

Set<Future<Double>> futures = new HashSet<>(tasks.keySet());
Set<Future<Double>> failures = new LinkedHashSet<>();

while (!futures.isEmpty()) {
    try {
        Future<Double> future = completionService.take();
        int n = tasks.get(future); //original value of n
        double squareRoot = future.get(); //result of task
        System.out.printf("The square root of %d is %f%n", n, squareRoot);
    }
    catch (ExecutionException e) {
        Integer n = tasks.get(future); //original value of n
        System.err.printf("Failure to obtain square root of %d: %s%n", n, e.getMessage());
        failures.add(future);
    }
    catch (InterruptedException e) {
        //TODO: handle interruption somehow, perhaps logging partial results?
    }
    finally {
        futures.remove(future);
    }
}

if(!failures.isEmpty()) {
    //TODO you may want to do do something about the failures you got
}

既然未来是我们Map上的关键 completionService 报告一个给定的未来已经准备好了,那么我们就可以很容易地在Map上查找这个未来,并得到它的原始值 n 我们提交待处理的(即。 int n = tasks.get(future) ).

提出解决方案2:上下文例外

处理该问题的另一种方法是确保抛出的异常包含恢复原始请求对象所需的所有上下文细节。
例如,我们可以改变 sqrt 代码如下:

static double sqrt(int n) {
    if(n > 0) {
        return Math.sqrt(n);
    }
    throw new InvalidIntegerException(n);
}

static class InvalidIntegerException extends RuntimeException {

    private final Integer n;

    InvalidIntegerException(int n) {
        super("Invalid integer:  " + n);
        this.n = n;
    }

    public Integer getInteger() {
        return n;
    }
}

如果我们不能计算平方根,我们会得到一个 InvalidIntegerException 包含处理失败的原始值的。这个例外可以把原始价值还给我们。
原始异常将被 Package 在 ExecutionException 由执行人服务。
所以,现在我们可以这样做:

Future<Double> future = completionService.take();
try {
    System.out.printf("The square root of %d is %f%n", tasks.get(future), future.get());
}
catch (ExecutionException e) {
    if(e.getCause() != null && e.getCause() instanceof InvalidIntegerException) {
        Integer n = ((InvalidIntegerException) e.getCause()).getInteger();
        System.err.println("Failure to calculate the square root of: " + n);
    }

}
我比较喜欢第一种解决方案,因为我不必对抛出的异常类型做任何假设,这使得代码更易于维护。或许其他人可以评论每种策略的利弊。

iyfjxgzm

iyfjxgzm3#

根据javadoc的说法 ExecutorService 将:
@返回表示任务的未来列表,其顺序与迭代器为给定任务列表生成的顺序相同,每个任务列表都已完成
所以考虑到你一开始就有一个任务列表 Future 在索引处遇到失败的对象 i 在这个名单上 Callable 索引处的任务 i 在任务列表中:

List<Request> requests = // obtain request
List<Callable<Response>> tasks = new ArrayList<>();
requests.forEach(req -> tasks.add(() -> sendRequest(req)));

List<Future<Response>> futures = executor.invokeAll(tasks);

for (int i = 0; i < futures.size(); i++) {
  try {
    Response rsp = future.get();
    handleResponse(rsp);
  } catch (ExecutionException e) {
    Request req = requests.get(i); // this is request parameter for send request at index i, assuming your Service does not violate its contract
    handleRequestFailure(req, e.getCause());
  }
}

相关问题