- CompletableFuture — руководство с примерами
- Запуск параллельного потока без получения результата — метод runAsync()
- Запуск параллельного потока с получением результата — метод supplyAsync()
- Callback-функции в CompletableFuture
- thenApply()
- thenRun() и thenAccept()
- Комбинация двух CompletableFutures
- thenCompose() для двух зависимых друг от друга CompletableFutures
- thenCombine() для двух независимых друг от друга CompletableFutures
- Комбинация нескольких CompletableFutures
- CompletableFuture.allOf()
- CompletableFuture.anyOf()
- Обработка исключений CompletableFuture
- Обработка исключений с помощью колбэка exceptionally()
- Обработка исключений с помощью handle()
- Итог
- CompletableFuture — руководство с примерами: 2 комментария
CompletableFuture — руководство с примерами
Чтобы получить результат вычислений, можно использовать блокирующий метод get(). Как и в Future, он блокирует текущий тред до тех пор, пока результат не будет готов. А поскольку он у нас и не готовится, в нашем примере данная строка просто заблокирует поток:
String result = completableFuture.get()
Но можно и вручную завершить выполнение:
completableFuture.complete("Future's Result")
Все клиенты, ожидающие результата вычислений, получат результат. Дальнейшие вызовы completableFuture.complete() будут проигнорированы.
Запуск параллельного потока без получения результата — метод runAsync()
С помощью runAsync() можно запустить в отдельном треде задачу, для которой не требуется возвращать результат. Будет возвращен CompletableFuture.
В примере мы просто ждем секунду и печатаем значение, но в отдельном треде. А в основном треде блокируем код до получения «результата», но поскольку тип результата Void, то просто ждем завершения:
// Run a task specified by a Runnable Object asynchronously. CompletableFuture future = CompletableFuture.runAsync(new Runnable() < @Override public void run() < // Simulate a long-running Job try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println("I'll run in a separate thread than the main thread."); > >); // Block and wait for the future to complete future.get()
I'll run in a separate thread than the main thread.
Если убрать последнюю строчку, то ничего не напечатается, поскольку программа завершится, не дожидаясь завершения параллельного потока (он же не демон).
То же самое с использованием лямбда-выражения:
CompletableFuture future = CompletableFuture.runAsync(() -> < // Simulate a long-running Job try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println("I'll run in a separate thread than the main thread."); >);
Запуск параллельного потока с получением результата — метод supplyAsync()
А теперь вернем результат из параллельного треда. Для этого надо использовать метод supplyAsync():
// Using Lambda Expression CompletableFuture future = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of the asynchronous computation"; >); // Block and get the result of the Future String result = future.get(); System.out.println(result);
Result of the asynchronous computation
Callback-функции в CompletableFuture
Метод get() блокирует текущий поток. Обычно нам требуется другое. Надо не блокировать текущий поток для получения значения, а задать функцию, которая сделает что-то со значением сразу после того, как оно будет вычислено, в том же параллельном потоке. Так называемую callback-функцию.
thenApply()
Допустим, нам надо закончить вычисления в supplyAsync() и далее сделать что-то с результатом, не блокируя текущие поток. Вот supplyAsync():
// Create a CompletableFuture CompletableFuture whatsYourNameFuture = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println(Thread.currentThread().getName()); return "Rajeev"; >);
Возвращает она строку. Нам надо по окончании наших фиктивных вычислений длительностью 1 секунда присоединить к результату «Rajeev» приветствие «hello».
После thenApply() мы печатаем строку «go further», чтобы убедиться, что thenApply() не блокирует код. Только в конце мы блокируем код, чтобы получить конечный результат методом get() и вывести его:
// Attach a callback to the Future using thenApply() CompletableFuture greetingFuture = whatsYourNameFuture.thenApply(name -> < System.out.println(Thread.currentThread().getName()); return "Hello " + name; >); System.out.println(Thread.currentThread().getName()+" "+go further"); // Block and wait for the future to complete System.out.println(Thread.currentThread().getName()+" "+greetingFuture.get());
Обратите внимание, что везде мы выводим имя потока. В консоли у нас такой результат:
main go further ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Hello Rajeev
Обратите внимание, что функция внутри supplyAsync() и заданный для нее коллбэк внутри thenApply() не блокируют текущий поток.
Можно сделать целую цепочку методов thenApply():
CompletableFuture welcomeText = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >System.out.println(Thread.currentThread().getName()); return "Rajeev"; >).thenApply(name -> < System.out.println(Thread.currentThread().getName()); return "Hello " + name; >).thenApply(greeting -> < System.out.println(Thread.currentThread().getName()); return greeting + ", Welcome to the CalliCoder Blog"; >); System.out.println(Thread.currentThread().getName()+" go further"); // Block and wait for the future to complete System.out.println(Thread.currentThread().getName()+" "+welcomeText .get());
main go further ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Hello Rajeev, Welcome to the CalliCoder Blog
thenRun() и thenAccept()
Если из callback-функции значение возвращать не надо, а надо просто что-то в ней сделать, то подойдут методы thenRun() и thenAccept(). Они возвращают CompletableFuture.
// thenAccept() example CompletableFuture.supplyAsync(() -> < return ProductService.getProductDetail(productId); >).thenAccept(product -> < System.out.println("Got product detail from remote service " + product.getName()) >);
Тут сервис выдает продукт (ProductDetail) и когда ProductDetail получен, мы печатаем имя продукта.
thenAccept() имеет доступ к тому, что получено в supplyAsync(), а вот thenAccept() уже результат не возвращает.
thenRun() отличается от thenAccept() тем, что даже не имеет доступа к тому, что вычислил supplyAsync():
// thenRun() example CompletableFuture.supplyAsync(() -> < // Run some computation >).thenRun(() -> < // Computation Finished. >);
Комбинация двух CompletableFutures
thenCompose() для двух зависимых друг от друга CompletableFutures
Допустим, мы хотим получить данные пользователя с помощью удаленного сервиса API, а потом получить данные его кредитной карты с помощью другого сервиса.
У нас есть соответствующие сервисы:
CompletableFuture getUsersDetail(String userId) < return CompletableFuture.supplyAsync(() ->< UserService.getUserDetails(userId); >); > CompletableFuture getCreditRating(User user) < return CompletableFuture.supplyAsync(() ->< CreditRatingService.getCreditRating(user); >); >
Логично применить thenApply(), как и раньше. Но. В предыдущих примерах callback-функция, которая запускалась в thenApply(), возвращала обычное значение, а не CompletableFuture. А теперь у нас эта callback-функция возвращает CompletableFuture (сервис getCreditRating() — нам же его надо вызвать):
CompletableFuture> result = getUserDetail(userId) .thenApply(user -> getCreditRating(user));
Таким образом конечный результат — это вложенный CompletableFuture.
Если же мы хотим избавиться от вложенности, то надо использовать метод thenCompose():
CompletableFuture result = getUserDetail(userId) .thenCompose(user -> getCreditRating(user));
Короче говоря, метод thenCompose() надо использовать, когда callback-функция возвращает не простое значение, а CompletableFuture, а надо избавиться от вложенности.
thenCombine() для двух независимых друг от друга CompletableFutures
С помощью thenCompose() мы комбинировали два Future, в которых один зависел от другого.
thenCombine() используется, когда надо запустить два независимых Future и сделать нечто после того, как оба они завершатся.
Вот пример. Первый CompletableFuture возвращает вес, второй — рост. Когда оба значения вычислятся, мы рассчитываем индекс bmi. Он вычисляется в callback-функции, переданной в метод thenCombine():
System.out.println("Retrieving weight."); CompletableFuture weightInKgFuture = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return 65.0; >); System.out.println("Retrieving height."); CompletableFuture heightInCmFuture = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return 177.8; >); System.out.println("Calculating BMI."); CompletableFuture combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> < Double heightInMeter = heightInCm/100; return weightInKg/(heightInMeter*heightInMeter); >); System.out.println("Your BMI is - " + combinedFuture.get());
Комбинация нескольких CompletableFutures
Пока что мы комбинировали два CompletableFuture. А что если их много? Для этого есть методы:
static CompletableFuture allOf(CompletableFuture. cfs) static CompletableFuture anyOf(CompletableFuture. cfs)
CompletableFuture.allOf()
Допустим, есть список независимых CompletableFuture. Мы хотим их запустить параллельно, а по окончании выполнения их всех сделать кое-что еще. Вот тут то и пригодится CompletableFuture.allOf().
Допустим, мы хотим загрузить 100 веб-страниц. Загружать мы их хотим параллельно, то есть каждая страница грузится асинхронно:
CompletableFuture downloadWebPage(String pageLink) < return CompletableFuture.supplyAsync(() ->< // Code to download and return the web page's content >); >
А когда все страницы загрузятся, мы хотим посчитать количество страниц, текст которых содержит слово «CompletableFuture»
List webPageLinks = Arrays.asList(. ) // A list of 100 web page links // Download contents of all the web pages asynchronously ListpageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );
Единственная проблема состоит в том, что CompletableFuture.allOf() возвращает тип CompletableFuture . Но эту проблему можно решить, написав несколько дополнительных строк кода:
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture> allPageContentsFuture = allFutures.thenApply(v -> < return pageContentFutures.stream() .map(pageContentFuture ->pageContentFuture.join()) .collect(Collectors.toList()); >);
Посмотрите на код выше. Поскольку мы вызываем future.join() тогда, когда все страницы уже будут закачаны, ничего не блокируется.
Метод join() такой же, как get(): единственное отличие в том, что он выбрасывает unchecked exception (если его выбросил CompletableFuture).
Теперь подсчитаем число страниц, содержащих наше слово:
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture countFuture = allPageContentsFuture.thenApply(pageContents -> < return pageContents.stream() .filter(pageContent ->pageContent.contains("CompletableFuture")) .count(); >); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
CompletableFuture.anyOf()
Метод CompletableFuture.anyOf() возвращает CompletableFuture, который завершается сразу же, как только завершается первый из переданных ему в качестве аргументов CompletableFuture, и возвращает этот же самый результат, что и первый завершившийся CompletableFuture.
В примере первым завершается future2:
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(2); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of Future 1"; >); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(1); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of Future 2"; >); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> < try < TimeUnit.SECONDS.sleep(3); >catch (InterruptedException e) < throw new IllegalStateException(e); >return "Result of Future 3"; >); CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(anyOfFuture.get()); // Result of Future 2
Так что конечный результат:
Метод CompletableFuture.anyOf() принимает varargs, а возвращает CompletableFuture . Проблема в том, что все аргументы —CompletableFuture могут возвращать разный тип, и конечный тип неизвестен.
Обработка исключений CompletableFuture
Рассмотрим, что будет, если в какой-то из функций возникнет исключение:
CompletableFuture.supplyAsync(() -> < // Code which might throw an exception return "Some result"; >).thenApply(result -> < return "processed result"; >).thenApply(result -> < return "result after further processing"; >).thenAccept(result -> < // do something with the final result >);
Если оно возникло в supplyAsync(), то никакой из колбэков вызван не будет.
Если оно возникло в первом thenApply(), то второй (и дальнейшие thenApply()) вызваны не будут. И так далее.
Обработка исключений с помощью колбэка exceptionally()
Integer age = -1; CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> < if(age < 0) < throw new IllegalArgumentException("Age can not be negative"); >if(age > 18) < return "Adult"; >else < return "Child"; >>).exceptionally(ex -> < System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; >); System.out.println("Maturity : " + maturityFuture.get());
Здесь в supplyAsync() возникает исключение и сразу выполняется колбэк из exceptionally(), в котором можно вернуть значение по умолчанию.
Обработка исключений с помощью handle()
Коллбэк в handle() вызывается независимо от того, возникло ли исключение:
Integer age = -1; CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> < if(age < 0) < throw new IllegalArgumentException("Age can not be negative"); >if(age > 18) < return "Adult"; >else < return "Child"; >>).handle((res, ex) -> < if(ex != null) < System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; >return res; >); System.out.println("Maturity : " + maturityFuture.get());
Если исключение возникает, то аргумент res равен null, если же нет — то ex равен null.
Итог
Код примеров частично доступен на GitHub (код к первой части статьи, сервисов обращения к репозиторию там нет).
CompletableFuture — руководство с примерами: 2 комментария
main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev, Welcome to the CalliCoder Blog Откуда берётся » Welcome to the CalliCoder Blog» в самом первом примере thenApply()? Откуда берётся в цепочке thenApply() понятно) БОЛЬШОЕ спасибо за ваш труд!