Java concurrent worker thread

Thread’ом Java не испортишь: Часть V — Executor, ThreadPool, Fork Join

Java-университет

Thread

Итак, мы знаем, что в Java есть потоки, о чём можно прочитать в обзоре «Thread’ом Java не испортишь : Часть I — потоки». Давайте ещё раз посмотрим на типовой код:

 public static void main(String []args) throws Exception < Runnable task = () ->< System.out.println("Task executed"); >; Thread thread = new Thread(task); thread.start(); > 

Как мы видим, код для запуска задачи довольно типовой, но на каждый новый запуск нам его придётся повторять. Одно из решений — вынести его в отдельный метод, например, execute(Runnable runnable) . Но разработчики Java за нас уже побеспокоились и придумали интерфейс Executor :

 public static void main(String []args) throws Exception < Runnable task = () ->System.out.println("Task executed"); Executor executor = (runnable) -> < new Thread(runnable).start(); >; executor.execute(task); > 

Thread

Как видно, код стал лаконичнее и позволил нам просто написать код по запуску Runnable в потоке. Здорово, не так ли? Но это только начало:

Как видно, у интерфейса Executor есть интерфейс-наследник ExecutorService . В JavaDoc данного интерфейса сказано, что ExecutorService является описанием особого Executor ‘а, который предоставляет методы по остановке работы Executor ‘а и позволяет получить java.util.concurrent.Future , чтобы отслеживать процесс выполнения. Ранее, в «Thread’ом Java не испортишь : Часть IV — Callable, Future и друзья» мы уже кратко рассмотрели возможности Future . Кто забыл или не читал — советую освежить в памяти 😉 Что ещё интересного в JavaDoc написано? Что у нас есть специальная фабрика java.util.concurrent.Executors , которая нам позволяет создавать доступные по умолчанию реализации ExecutorService .

Читайте также:  Php пароль по умолчанию

ExecutorService

Ещё раз вспомним. У нас есть Executor для execute (т.е. выполнения) некой задачи в потоке, когда реализация создания потока скрыта от нас. У нас есть ExecutorService — особый Executor , который имеет набор возможностей по управлению ходом выполнения. И у нас есть фабрика Executors , которая позволяет создавать ExecutorService . Давайте теперь это проделаем сами:

 public static void main(String[] args) throws ExecutionException, InterruptedException < Callabletask = () -> Thread.currentThread().getName(); ExecutorService service = Executors.newFixedThreadPool(2); for (int i = 0; i < 5; i++) < Future result = service.submit(task); System.out.println(result.get()); >service.shutdown(); > 
  • поток пытается получить элементы из пустой очереди
  • поток пытается положить элементы в полную очередь
 public static ExecutorService newFixedThreadPool(int nThreads) < return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); > 
 public static ExecutorService newCachedThreadPool() < return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); > 

Thread

Как мы видим, внутри фабричных методов создаются реализации ExecutorService . И в основном это ThreadPoolExecutor . Меняются только атрибуты, которые и влияют на работу.

ThreadPoolExecutor

Как мы ранее увидели, внутри фабричных методов в основном создаётся ThreadPoolExecutor . На функциональность влияет то, какие значения переданы в качестве максимума и минимума потоков, а также какая очередь используется. А использоваться может любая реализация интерфейса java.util.concurrent.BlockingQueue . Говоря о ThreadPoolExecutor ‘ах, стоит отметить интересные особенности при работе. Например, нельзя посылать задачи в ThreadPoolExecutor , если там нет места:

 public static void main(String[] args) throws ExecutionException, InterruptedException < int threadBound = 2; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound, 0L, TimeUnit.SECONDS, new SynchronousQueue<>()); Callable task = () -> < Thread.sleep(1000); return Thread.currentThread().getName(); >; for (int i = 0; i < threadBound + 1; i++) < threadPoolExecutor.submit(task); >threadPoolExecutor.shutdown(); > 
 Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] 

То есть task нельзя засабмитить, т.к. SynchronousQueue устроена так, что фактически состоит из одного элемента и не позволяет положить туда больше. Как мы видим, queued tasks здесь 0, и в этом нет ничего странного, т.к. это специфика SynchronousQueue — фактически это очередь в 1 элемент, которая всегда пустая. (!) Когда один поток кладёт в очередь элемент, он будет ждать, пока другой поток не заберёт элемент из очереди. Поэтому, мы можем заменить на new LinkedBlockingQueue<>(1) и в ошибке изменится то, что будет указано queued tasks = 1 . Т.к. очередь всего в 1 элемент, то второй мы уже положить не можем. И упадём на этом. Продолжая тему очереди, стоит отметить, что класс ThreadPoolExecutor обладает дополнительными методами по обслуживанию очереди. Например, метод threadPoolExecutor.purge() удалит из очереди все отменённые задачи, чтобы освободить место в очереди. Ещё одной интересной функцией, связанной с очередью, является обработчик непринятых задач:

 public static void main(String[] args) < ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue()); Callabletask = () -> Thread.currentThread().getName(); threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected")); for (int i = 0; i < 5; i++) < threadPoolExecutor.submit(task); >threadPoolExecutor.shutdown(); > 

Для примера обработчик просто выводит слово Rejected на каждый отказ принимать задачу в очередь. Удобно, не правда ли? Кроме того, ThreadPoolExecutor имеет интересного наследника — ScheduledThreadPoolExecutor , который является ScheduledExecutorService . Он предоставляет возможность выполнять задачу по таймеру.

ScheduledExecutorService

ExecutorService типа ScheduledExecutorService позволяют запускать задачи по расписанию (schedule). Посмотрим на пример:

 public static void main(String[] args) < ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4); Callabletask = () -> < System.out.println(Thread.currentThread().getName()); return Thread.currentThread().getName(); >; scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES); scheduledExecutorService.shutdown(); > 

Тут всё просто. Отправляются задачи, получаем «запланированную задачу» java.util.concurrent.ScheduledFuture . С расписанием может быть полезен также и следующий случай:

 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4); Runnable task = () -> < System.out.println(Thread.currentThread().getName()); >; scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS); 

Здесь мы отправляем Runnable задачу на выполнение с фиксированной частотой (Fixed Rate) с определённой задержкой. В данном случае, через 1 секунду каждые 2 секунды начать выполнять задачу. Есть похожий вариант:

 scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS); 

Thread

Но здесь задачи выполняются с заданным промежутком МЕЖДУ выполнением разных задач. То есть задача task будет выполнена через 1 секунду. Далее, как только она будет завершена, пройдёт 2 секунды, и тогда новая задача task будет запущена. По данной теме можно прочитать следующие материалы:

  • An introduction to thread pools
  • Introduction to Thread Pools
  • Java Multithreading Steeplechase: Cancelling Tasks In Executors
  • Picking correct Java executors for background tasks

WorkStealingPool

Помимо указанных выше пулов потоков, есть ещё один. Можно сказать, что он немного особенный. Имя ему — Work Stealing Pool. Если кратко, то Work Stealing — это такой алгоритм работы, при котором простаивающие потоки начинают забирать задачи других потоков или задачи из общей очереди. Посмотрим на пример:

 public static void main(String[] args) < Object lock = new Object(); ExecutorService executorService = Executors.newCachedThreadPool(); Callabletask = () -> < System.out.println(Thread.currentThread().getName()); lock.wait(2000); System.out.println("Finished"); return "result"; >; for (int i = 0; i < 5; i++) < executorService.submit(task); >executorService.shutdown(); > 

Если мы запустим данный код, то ExecutorService нам создаст 5 потоков, т.к. каждый поток будет вставать в wait очередь по локу объекта lock . Про мониторы и локи по нему мы уже ранее разбирались в «Thread’ом Java не испортишь: Часть II — синхронизация». А теперь мы заменим Executors.newCachedThreadPool на Executors.newWorkStealingPool() . Что поменяется? Мы увидим, что наши задачи выполняются не в 5 потоков, а меньше. Помните, что cachedThreadPool создавал на каждую задачу свой поток? Потому что wait блокировал поток, а следующие задачи хотели выполнятся и в пуле для них создавались новые потоки. В случае со StealingPool потоки не будут вечно простаивать в wait , они начнут выполнять соседние задачи. Чем так отличается от остальных тредпулов этот WorkStealingPool ? Тем, что внутри него живёт на самом деле волшебный ForkJoinPool :

 public static ExecutorService newWorkStealingPool()

На самом деле есть ещё одно отличие. Потоки, которые создаются для ForkJoinPool по умолчанию являются демон потоками, в отличие от потоков, созданных через обычный ThreadPool . Вообще, стоит помнить про демон-потоки, т.к. например при CompletableFuture тоже используются демон-потоки, если не указать свою ThreadFactory , которая будет создавать не демон-потоки. Вот такие вот сюрпризы могут ждать в неожиданном месте!)

Fork/Join Pool

Thread

В этой части мы поговорим про тот самый ForkJoinPool (его ещё называют fork/join framework), который живёт «под капотом» у WorkStealingPool . Вообще, Fork Join Framework появился ещё в Java 1.7. И пусть уже Java 11 на дворе, но вспомнить всё равно стоит. Не самая распространённая задача, но довольно интересная. На просторах сети есть хороший обзор на эту тему: «Fork/Join Framework в Java 7». Fork/JoinPool оперирует в своей работе таким понятием как java.util.concurrent.RecursiveTask . Также есть аналог — java.util.concurrent.RecursiveAction . RecursiveAction не возвращают результат. Таким образом RecursiveTask похож на Callable , а RecursiveAction похож на Runnable . Ну и смотря на название мы видим два ключевых метода — fork и join . Метод fork запускает асинхронно в отдельном потоке некоторую задачу. А метод join позволяет дождаться завершения выполнения работы. Существует несколько способов использования: Данный картинка — это часть слайда доклада Алексея Шипилёва «Fork/Join: реализация, использование, производительность». Чтобы стало понятнее, стоит посмотреть его доклад на JEE CONF: «Fork Join особенности реализации».

Подведение итогов

Итак, вот мы и закончили очередную часть обзора. Мы разобрались, что сначала придумали Executor для выполнения потоков. Потом решили продолжить идею и придумали ExecutorService . ExecutorService позволяет отправлять задачи на выполнение при помощи submit и invoke , а также управлять сервисом, выключая его. Т.к. ExecutorService ‘у нужны реализации, написали класс с фабричными методами и назвали его Executors . Он позволяет создавать пулы потоков ThreadPoolExecutor ‘ы. При этом существуют такие пулы потоков, которые позволяют ещё и указать расписание для выполнения, а за WorkStealingPool прячется ForkJoinPool . Надеюсь, Вам было не только интересно выше написанное, но и понятно ) Всегда рад предложениям и замечаниям. #Viacheslav

Источник

Worker Threads and SwingWorker

When a Swing program needs to execute a long-running task, it usually uses one of the worker threads, also known as the background threads. Each task running on a worker thread is represented by an instance of javax.swing.SwingWorker . SwingWorker itself is an abstract class; you must define a subclass in order to create a SwingWorker object; anonymous inner classes are often useful for creating very simple SwingWorker objects.

SwingWorker provides a number of communication and control features:

  • The SwingWorker subclass can define a method, done , which is automatically invoked on the event dispatch thread when the background task is finished.
  • SwingWorker implements java.util.concurrent.Future . This interface allows the background task to provide a return value to the other thread. Other methods in this interface allow cancellation of the background task and discovering whether the background task has finished or been cancelled.
  • The background task can provide intermediate results by invoking SwingWorker.publish , causing SwingWorker.process to be invoked from the event dispatch thread.
  • The background task can define bound properties. Changes to these properties trigger events, causing event-handling methods to be invoked on the event dispatch thread.

These features are discussed in the following subsections.

The javax.swing.SwingWorker class was added to the Java platform in Java SE 6. Prior to this, another class, also called SwingWorker , was widely used for some of the same purposes. The old SwingWorker was not part of the Java platform specification, and was not provided as part of the JDK.

The new javax.swing.SwingWorker is a completely new class. Its functionality is not a strict superset of the old SwingWorker . Methods in the two classes that have the same function do not have the same names. Also, instances of the old SwingWorker class were reusable, while a new instance of javax.swing.SwingWorker is needed for each new background task.

Throughout the Java Tutorials, any mention of SwingWorker now refers to javax.swing.SwingWorker .

Источник

Оцените статью