将Java Future转换为CompletableFuture

ryhaxcpt  于 2023-06-28  发布在  Java
关注(0)|答案(8)|浏览(224)

Java 8引入了CompletableFuture,这是一个可组合的Future的新实现(包括一堆thenXxx方法)。我想专门使用它,但我想使用的许多库只返回不可组合的Future示例。
有没有一种方法可以将返回的Future示例 Package 在CompleteableFuture中,这样我就可以组合它了?

carvr3hs

carvr3hs1#

如果你想使用的库除了Future样式之外还提供了一个回调样式的方法,你可以为它提供一个处理程序来完成CompletableFuture,而不需要任何额外的线程阻塞。就像这样:

AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

如果没有回调,我认为解决这个问题的唯一方法是使用一个轮询循环,将所有Future.isDone()检查放在一个线程上,然后每当Future可获取时调用complete。

6uxekuva

6uxekuva2#

如果您的Future是调用ExecutorService方法的结果(例如:submit()),最简单的方法是使用CompletableFuture.runAsync(Runnable, Executor)方法。

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

然后,CompletableFuture被“本地”创建。
编辑:@SamMefford的评论由@MartinAndersson更正,如果你想传递一个Callable,你需要调用supplyAsync(),将Callable<T>转换为Supplier<T>,例如其中:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);

因为T Callable.call() throws Exception;会抛出异常,而T Supplier.get();不会,所以必须捕获异常,这样原型才能兼容。

异常处理说明

get()方法没有指定throws,这意味着它不应该抛出 checked 异常。但是,可以使用 unchecked 异常。CompletableFuture中的代码显示CompletionException已被使用且未选中(即是一个RuntimeException),因此catch/throw会将任何异常 Package 到CompletionException中。
此外,正如@WeGa所指出的,您可以使用handle()方法来处理结果可能引发的异常:

CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
        if (ex != null) {
            // An exception occurred ...
        } else {
            // No exception was thrown, 'res' is valid and can be handled here
        }
    });
hpcdzsge

hpcdzsge3#

有一个方法,但你不会喜欢的。下面的方法将Future<T>转换为CompletableFuture<T>

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

显然,这种方法的问题是,对于每个 Future,一个线程将被阻塞以等待 Future 的结果-与期货的想法相矛盾。在某些情况下,可能会做得更好。然而,一般来说,如果不积极地等待“未来”的结果,就没有解决方案。

ki1q1bka

ki1q1bka4#

我发布了一个小的futurity项目,试图在答案中做得比the straightforward way更好。
主要思想是只使用一个线程(当然不仅仅是一个自旋循环)来检查内部的所有Futures状态,这有助于避免在每个Future-> CompletableFuture转换时阻塞池中的线程。
使用示例:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
q9rjltbz

q9rjltbz5#

建议:
https://gabfssilva.github.io/old-java-future-to-completable-future/
但是,基本上:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

关于CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

示例:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
ejk8hzay

ejk8hzay6#

让我提出另一个(希望是更好的)选择:https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent
简单地说,其思想如下:

  1. CompletableTask<V>接口介绍--CompletionStage<V> + RunnableFuture<V>的结合
    1.扭曲ExecutorService以从submit(...)方法返回CompletableTask(而不是Future<V>
    1.完成了,我们有了可运行和可组合的Futures。
    实现使用另一种CompletionStage实现(注意,是CompletionStage而不是CompletableFuture):
    用途:
J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
nwsw7zdq

nwsw7zdq7#

public static <T> CompletableFuture<T> fromFuture(Future<T> f) {
    return CompletableFuture.completedFuture(null).thenCompose(avoid -> {
        try {
            return CompletableFuture.completedFuture(f.get());
        } catch (InterruptedException e) {
            return CompletableFuture.failedFuture(e);
        } catch (ExecutionException e) {
            return CompletableFuture.failedFuture(e.getCause());
        }
    });
}
9bfwbjaz

9bfwbjaz8#

主要的想法是这样的:

Future<?> future = null;
return CompletableFuture.supplyAsync(future::get);

但是,您将收到来自编译器的一些警告。
这是第一个选择。

Future<?> future = null;
return CompletableFuture.supplyAsync(
        ()->{
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

第二个选项,通过强制转换函数接口来隐藏try...catch。

@FunctionalInterface
    public interface MySupplier<T> extends Supplier<T> {
        @Override
        default T get() {
            try {
                return getInternal();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        T getInternal() throws Exception;
    }

    public static void main(String[] args) {
        Future<?> future = null;
        return CompletableFuture.supplyAsync((MySupplier<?>) future::get);

    }

第三种选择,找出一些提供了这样一个功能接口的第三方库。
标签:Java 8 Lambda function that throws exception?

相关问题