Асинхронное программирование на примерах: реконструкция методов java.util.concurrent.CompletableFuture
Для чего нужна реконструкция, если исходный код этого класса открыт?
Хотя бы потому, что там под капотом высокооптимизированный, трудночитаемый код, изучение которого мало что дает в педагогическом плане.
Поэтому мы воссоздадим семантику операций по их спецификациям, и напишем функционально эквивалентный, понятный и читаемый код, хотя, возможно, и не самый экономный по расходу памяти и времени процессора.
Начнем с относительно простого метода:
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture
Прочитаем внимательно спецификацию:
Returns a new CompletableFuture
То есть, создается объект типа CompletableFuture либо его подкласса и возвращается в качестве результата.
that is asynchronously completed by a task running in the given executor`
Кроме того, создается задача, исполняемая на Executor ‘e.
Как мы знаем, Executor принимает только объекты типа Runnable .
Runnable это интерфейс, и первый объект вполне может его реализовывать — так мы совместим две функции в одном объекте.
completed . with the value obtained by calling the given Supplier.
Этот Runnable должен вызвать данный Supplier и полученным значением завершить созданный CompletableFuture .
Supplier — это функция без параметров, так что закодировать все это очень просто:
class CompletableFutureForSupplyAsync extends CompletableFuture implements Runnable < Suppliersupplier; public CompletableFutureForSupplyAsync(Supplier supplier) < this.supplier = supplier; >public void run() < try < U result = supplier.get(); super.complete(result); >catch (Throwable e) < super.completeExceptionally(e); >> > public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) < CompletableFutureForSupplyAsynctask = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; >
Следующий пример несколько сложнее:
public CompletableFuture thenApplyAsync(Function fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage
Returns a new CompletionStage that. is executed using the supplied Executor
Здесь нам прямо предлагают оформить создаваемый объект оформить в виде Runnable .
. with this stage’s result as the argument to the supplied function.
а вот это уже интереснее. Передаваемая нам функция имеет параметр, и значением этого параметра служит значение, завершающее текущий CompletionStage . В момент вызова thenApplyAsync это значение может быть неизвестно, поэтому сразу запустить задачу на Executor мы не можем. Вместо этого мы должны договориться с текущим CompletionStage ,
чтобы в момент своего завершения она передала свое значение в задачу. Среди многочесленных методов CompletionStage имеются один, в точности подходщий для этой цели, это whenComplete :
public CompletableFuture whenComplete(BiConsumer action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.
То есть, во вновь создаваемом объекте-задаче достаточно реализовать еще интерфейс BiConsumer для приема аргумента:
class CompletableFutureForApplyAsync extends CompletableFuture implements Runnable, BiConsumer < Function super T,? extends U>fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function super T,? extends U>fn, Executor executor) < this.fn = fn; this.executor = executor; >@Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) < if (throwable != null) < this.throwable = throwable; >else < this.arg = argument; >executor.execute(this); > @Override public void run() < if (throwable == null) < try < U result = fn.apply(arg); super.complete(result); >catch (Throwable e) < super.completeExceptionally(e); >> else < super.completeExceptionally(throwable); >> > public CompletableFuture thenApplyAsync(Function super T,? extends U>fn, Executor executor ) < CompletableFutureForApplyAsynctask = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; > >
Этот пример очень важен для понимания природы асинхронного программирования, поэтому еще раз перечислим его основные шаги:
1) создается асинхронная процедура:
CompletableFutureForApplyAsync task = new CompletableFutureForApplyAsync<>(fn, executor);
2) она пока не готова к исполнению, поэтому мы просим поставщика недостающего аргумента передать нам этот аргумент в будущем, вызвав поданный нами метод:
3) в этом методе мы не только сохраняем полученный аргумент, но и запускаем задачу на исполнение (см метод accept ()).
4) исполнение задачи сводится к выполнению поданой нам функции и сохранении результата.
Этот реультат может быть точно также затребован другими процедурами с помощью метода whenComplete (), примененного уже к нашему вновь построенному объекту, так что мы можем построить цепочку асинхронных процедур произвольной длины. Но исполнятся эта цепочка будет строго последовательно, без всякого параллелизма.
A как же изобразить более сложную диаграмму вычислений, содержащую параллельные ветви?
Для этого служит метод thenCombineAsync .
Если в предыдущем примере мы запускали асинхронную процедуру с одним аргументом, то в этом — с двумя.
При этом вычисление обоих аргументов может происходить параллельно.
ublic CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.
Здесь все как в предыдущем примере с thenApplyAsync , но параметр-функция уже от двух аргументов, и добавлен параметр CompletionStage other , являющийся асинхронным поставщиком второго аргумента.
Как же нам обеспечить обработку второго аргумента?
Ну во-первых, вместо одной переменной T arg описать две: T arg1; U arg2; , a вместо одного метода public void accept(T argument, Throwable throwable) описать два — accept1 и accept2 ,
каждый из которых работает со своим аргументом.
При этом наш строящийся объект уже не имплементирует интерфейс BiConsumer и мы не можем уже написать ключевое предложение для связи узлов графа асинхронных вычислений
К счастью, объект функционального интерфейса может быть изображен ссылкой на метод, без заключения его в отдельный класс:
this.whenComplete(task::accept1); other.whenComplete(task::accept2);
То есть, текущий объект this поставляет первый аргумент, а объект other — второй.
Вот только коды методов придется изменить, чтобы они не запускали задачу сразу при приходе своего аргумента, но также проверяли поступление и второго:
public synchronized void accept1(T argument, Throwable throwable) < if (throwable != null) < this.throwable = throwable; executor.execute(this); >else < this.arg1 = argument; if (arg2 != null) < executor.execute(this); >> >
Аналогично описывается метод accept2.
- методы стали синхронизованными (работаем с общими данными)
- в случае передачи ошибки ждать второго аргумента не нужно.
- проверка поступления аргумента сравнением на null — не самый лучший способ, может, надо завести на каждый аргумент булевскую переменную.
Таким способом можно сделать асинхронные процедуры и от большего числа аргументов, чем два, но сразу приходит мысль — может все же сделать отдельный класс для параметров, чтобы не писать для приема каждого параметра свой метод, а обходится динамическим созданием параметров?
Parameter arg1 = new Parameter<>(); Parameter arg2 = new Parameter<>(); . future1.whenComplete(arg1); future2.whenComplete(arg2);
Да, такой класс создать можно, но об этом в следующий раз.
Краткое резюме из вышеизложенного:
- асинхронная программа — это сеть связанных между собой асинхронных процедур,
точно также, как мультипоточная программа — сеть связанных между собой потоков исполнения (threads).
Но средства связи потоков и асинхронных процедур кардинально отличаются.
Потоки связываются с помощью семафоров, блокирующих очередей и прочих подобных объектов,
которые блокируют поток получателя, если информация еще не поступила, но поток уже пытается ее извлечь с помощью pull-based операции.
Асинхронные процедуры — получатели просто не заходят на исполнение, пока на будет готова вся необходимая им информация.
Они пассивно ждут, пока поставщики информации сами не передадут ее с помощью push-based операции.
Благодаря этому они не тратят память на стек во время ожидания, и, следовательно, занимают гораздо меньше памяти, чем потоки исполнения.
- построение сети асинхронных процедур сводится к созданию объектов и связыванию их между собой, точнее, к связыванию их подобъектов — параметров: выходному параметру поставщика информации передается адрес входного параметра получателя.
Набор методов CompletableFuture ровно это и делает, и в принципе, без этих методов можно обойтись, создавая объекты явно, как показано в приведенных выше примерах.
Но для этого надо иметь классы, аналогичные тем, что были описаны в этих примерах.
По какой-то причине создатели java.util.concurrent предпочли не давать пользователям доступ к этим классам и скрыли их в глубине кода CompletableFuture .
Те, кто хочет иметь перед глазами наглядное представление создаваемой асинхронной сети, могут реконструировать эти классы, продолжив приведенные примеры. Исходный код примеров доступен на Гитхабе.