Java completablefuture get result

CompletableFuture — руководство с примерами

Чтобы получить результат вычислений, можно использовать блокирующий метод get(). Как и в Future, он блокирует текущий тред до тех пор, пока результат не будет готов. А поскольку он у нас и не готовится, в нашем примере данная строка просто заблокирует поток:

String result = completableFuture.get()

Но можно и вручную завершить выполнение:

completableFuture.complete("Future's Result")

Все клиенты, ожидающие результата вычислений, получат результат. Дальнейшие вызовы completableFuture.complete() будут проигнорированы.

Запуск параллельного потока без получения результата — метод runAsync()

С помощью runAsync() можно запустить в отдельном треде задачу, для которой не требуется возвращать результат. Будет возвращен CompletableFuture.

В примере мы просто ждем секунду и печатаем значение, но в отдельном треде. А в основном треде блокируем код до получения «результата», но поскольку тип результата Void, то просто ждем завершения:

// Run a task specified by a Runnable Object asynchronously. CompletableFuture future = CompletableFuture.runAsync(new Runnable() < @Override public void run() < // Simulate a long-running Job try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println("I'll run in a separate thread than the main thread."); > >); // Block and wait for the future to complete future.get()
I'll run in a separate thread than the main thread.

Если убрать последнюю строчку, то ничего не напечатается, поскольку программа завершится, не дожидаясь завершения параллельного потока (он же не демон).

Читайте также:  Javascript отличие объекта от массива

То же самое с использованием лямбда-выражения:

CompletableFuture future = CompletableFuture.runAsync(() -> < // Simulate a long-running Job try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println("I'll run in a separate thread than the main thread."); >);

Запуск параллельного потока с получением результата — метод supplyAsync()

А теперь вернем результат из параллельного треда. Для этого надо использовать метод supplyAsync():

// Using Lambda Expression CompletableFuture future = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of the asynchronous computation"; >); // Block and get the result of the Future String result = future.get(); System.out.println(result);
Result of the asynchronous computation

Callback-функции в CompletableFuture

Метод get() блокирует текущий поток. Обычно нам требуется другое. Надо не блокировать текущий поток для получения значения, а задать функцию, которая сделает что-то со значением сразу после того, как оно будет вычислено, в том же параллельном потоке. Так называемую callback-функцию.

thenApply()

Допустим, нам надо закончить вычисления в supplyAsync() и далее сделать что-то с результатом, не блокируя текущие поток. Вот supplyAsync():

// Create a CompletableFuture CompletableFuture whatsYourNameFuture = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println(Thread.currentThread().getName()); return "Rajeev"; >);

Возвращает она строку. Нам надо по окончании наших фиктивных вычислений длительностью 1 секунда присоединить к результату «Rajeev» приветствие «hello».

После thenApply() мы печатаем строку «go further», чтобы убедиться, что thenApply() не блокирует код. Только в конце мы блокируем код, чтобы получить конечный результат методом get() и вывести его:

// Attach a callback to the Future using thenApply() CompletableFuture greetingFuture = whatsYourNameFuture.thenApply(name -> < System.out.println(Thread.currentThread().getName()); return "Hello " + name; >); System.out.println(Thread.currentThread().getName()+" "+go further"); // Block and wait for the future to complete System.out.println(Thread.currentThread().getName()+" "+greetingFuture.get());

Обратите внимание, что везде мы выводим имя потока. В консоли у нас такой результат:

main go further ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Hello Rajeev

Обратите внимание, что функция внутри supplyAsync() и заданный для нее коллбэк внутри thenApply() не блокируют текущий поток.

Можно сделать целую цепочку методов thenApply():

CompletableFuture welcomeText = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println(Thread.currentThread().getName()); return "Rajeev"; >).thenApply(name -> < System.out.println(Thread.currentThread().getName()); return "Hello " + name; >).thenApply(greeting -> < System.out.println(Thread.currentThread().getName()); return greeting + ", Welcome to the CalliCoder Blog"; >); System.out.println(Thread.currentThread().getName()+" go further"); // Block and wait for the future to complete System.out.println(Thread.currentThread().getName()+" "+welcomeText .get());
main go further ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Hello Rajeev, Welcome to the CalliCoder Blog

thenRun() и thenAccept()

Если из callback-функции значение возвращать не надо, а надо просто что-то в ней сделать, то подойдут методы thenRun() и thenAccept(). Они возвращают CompletableFuture.

// thenAccept() example CompletableFuture.supplyAsync(() -> < return ProductService.getProductDetail(productId); >).thenAccept(product -> < System.out.println("Got product detail from remote service " + product.getName()) >);

Тут сервис выдает продукт (ProductDetail) и когда ProductDetail получен, мы печатаем имя продукта.

thenAccept() имеет доступ к тому, что получено в supplyAsync(), а вот thenAccept() уже результат не возвращает.

thenRun() отличается от thenAccept() тем, что даже не имеет доступа к тому, что вычислил supplyAsync():

// thenRun() example CompletableFuture.supplyAsync(() -> < // Run some computation >).thenRun(() -> < // Computation Finished. >);

Комбинация двух CompletableFutures

thenCompose() для двух зависимых друг от друга CompletableFutures

Допустим, мы хотим получить данные пользователя с помощью удаленного сервиса API, а потом получить данные его кредитной карты с помощью другого сервиса.

У нас есть соответствующие сервисы:

CompletableFuture getUsersDetail(String userId) < return CompletableFuture.supplyAsync(() ->< UserService.getUserDetails(userId); >); > CompletableFuture getCreditRating(User user) < return CompletableFuture.supplyAsync(() ->< CreditRatingService.getCreditRating(user); >); >

Логично применить thenApply(), как и раньше. Но. В предыдущих примерах callback-функция, которая запускалась в thenApply(), возвращала обычное значение, а не CompletableFuture. А теперь у нас эта callback-функция возвращает CompletableFuture (сервис getCreditRating() — нам же его надо вызвать):

CompletableFuture> result = getUserDetail(userId) .thenApply(user -> getCreditRating(user));

Таким образом конечный результат — это вложенный CompletableFuture.

Если же мы хотим избавиться от вложенности, то надо использовать метод thenCompose():

CompletableFuture result = getUserDetail(userId) .thenCompose(user -> getCreditRating(user));

Короче говоря, метод thenCompose() надо использовать, когда callback-функция возвращает не простое значение, а CompletableFuture, а надо избавиться от вложенности.

thenCombine() для двух независимых друг от друга CompletableFutures

С помощью thenCompose() мы комбинировали два Future, в которых один зависел от другого.

thenCombine() используется, когда надо запустить два независимых Future и сделать нечто после того, как оба они завершатся.

Вот пример. Первый CompletableFuture возвращает вес, второй — рост. Когда оба значения вычислятся, мы рассчитываем индекс bmi. Он вычисляется в callback-функции, переданной в метод thenCombine():

System.out.println("Retrieving weight."); CompletableFuture weightInKgFuture = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return 65.0; >); System.out.println("Retrieving height."); CompletableFuture heightInCmFuture = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return 177.8; >); System.out.println("Calculating BMI."); CompletableFuture combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> < Double heightInMeter = heightInCm/100; return weightInKg/(heightInMeter*heightInMeter); >); System.out.println("Your BMI is - " + combinedFuture.get());

Комбинация нескольких CompletableFutures

Пока что мы комбинировали два CompletableFuture. А что если их много? Для этого есть методы:

static CompletableFuture allOf(CompletableFuture. cfs) static CompletableFuture anyOf(CompletableFuture. cfs)

CompletableFuture.allOf()

Допустим, есть список независимых CompletableFuture. Мы хотим их запустить параллельно, а по окончании выполнения их всех сделать кое-что еще. Вот тут то и пригодится CompletableFuture.allOf().

Допустим, мы хотим загрузить 100 веб-страниц. Загружать мы их хотим параллельно, то есть каждая страница грузится асинхронно:

CompletableFuture downloadWebPage(String pageLink) < return CompletableFuture.supplyAsync(() ->< // Code to download and return the web page's content >); >

А когда все страницы загрузятся, мы хотим посчитать количество страниц, текст которых содержит слово «CompletableFuture»

List webPageLinks = Arrays.asList(. ) // A list of 100 web page links // Download contents of all the web pages asynchronously List pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );

Единственная проблема состоит в том, что CompletableFuture.allOf() возвращает тип CompletableFuture . Но эту проблему можно решить, написав несколько дополнительных строк кода:

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture> allPageContentsFuture = allFutures.thenApply(v -> < return pageContentFutures.stream() .map(pageContentFuture ->pageContentFuture.join()) .collect(Collectors.toList()); >);

Посмотрите на код выше. Поскольку мы вызываем future.join() тогда, когда все страницы уже будут закачаны, ничего не блокируется.

Метод join() такой же, как get(): единственное отличие в том, что он выбрасывает unchecked exception (если его выбросил CompletableFuture).

Теперь подсчитаем число страниц, содержащих наше слово:

// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture countFuture = allPageContentsFuture.thenApply(pageContents -> < return pageContents.stream() .filter(pageContent ->pageContent.contains("CompletableFuture")) .count(); >); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

CompletableFuture.anyOf()

Метод CompletableFuture.anyOf() возвращает CompletableFuture, который завершается сразу же, как только завершается первый из переданных ему в качестве аргументов CompletableFuture, и возвращает этот же самый результат, что и первый завершившийся CompletableFuture.

В примере первым завершается future2:

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(2); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of Future 1"; >); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of Future 2"; >); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(3); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of Future 3"; >); CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(anyOfFuture.get()); // Result of Future 2

Так что конечный результат:

Метод CompletableFuture.anyOf() принимает varargs, а возвращает CompletableFuture . Проблема в том, что все аргументы —CompletableFuture могут возвращать разный тип, и конечный тип неизвестен.

Обработка исключений CompletableFuture

Рассмотрим, что будет, если в какой-то из функций возникнет исключение:

CompletableFuture.supplyAsync(() -> < // Code which might throw an exception return "Some result"; >).thenApply(result -> < return "processed result"; >).thenApply(result -> < return "result after further processing"; >).thenAccept(result -> < // do something with the final result >);

Если оно возникло в supplyAsync(), то никакой из колбэков вызван не будет.

Если оно возникло в первом thenApply(), то второй (и дальнейшие thenApply()) вызваны не будут. И так далее.

Обработка исключений с помощью колбэка exceptionally()

Integer age = -1; CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> < if(age < 0) < throw new IllegalArgumentException("Age can not be negative"); >if(age > 18) < return "Adult"; >else < return "Child"; >>).exceptionally(ex -> < System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; >); System.out.println("Maturity : " + maturityFuture.get());

Здесь в supplyAsync() возникает исключение и сразу выполняется колбэк из exceptionally(), в котором можно вернуть значение по умолчанию.

Обработка исключений с помощью handle()

Коллбэк в handle() вызывается независимо от того, возникло ли исключение:

Integer age = -1; CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> < if(age < 0) < throw new IllegalArgumentException("Age can not be negative"); >if(age > 18) < return "Adult"; >else < return "Child"; >>).handle((res, ex) -> < if(ex != null) < System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; >return res; >); System.out.println("Maturity : " + maturityFuture.get());

Если исключение возникает, то аргумент res равен null, если же нет — то ex равен null.

Итог

Код примеров частично доступен на GitHub (код к первой части статьи, сервисов обращения к репозиторию там нет).

CompletableFuture — руководство с примерами: 2 комментария

main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev, Welcome to the CalliCoder Blog Откуда берётся » Welcome to the CalliCoder Blog» в самом первом примере thenApply()? Откуда берётся в цепочке thenApply() понятно) БОЛЬШОЕ спасибо за ваш труд!

Источник

Оцените статью