В повседневной работе нечасто встретишь многопоточный пакет java.util.concurrent. Иногда существуют конструктивные ограничения на использование Java 1.4.2, в которой нет этого пакета, но чаще всего достаточно обычной синхронизации и ничего особенного не требуется.
К счастью, время от времени возникают задачи, которые заставляют немного подумать и либо написать велосипед, либо покопаться в javadocs и найти что-то более подходящее.
С велосипедом проблем нет — просто берешь и пишешь, благо в многопоточности нет ничего сверхсложного.
С другой стороны, меньше кода означает меньше ошибок.
Более того, никто в здравом уме не пишет юнит-тесты для многопоточности, потому что.
Это уже полноценные интеграционные тесты со всеми вытекающими последствиями.
Что выбрать для конкретного случая? В условиях загруженности и сроков охватить весь java.util.concurrent довольно сложно.
Выбирайте что-то похожее и вперед! Так постепенно в коде появляются ArrayBlockingQueue, ConcurrentHashMap, AtomicInteger, Collections.synchronizedList(new LinkedList()) и другие интересные вещи.
Иногда это правильно, иногда нет. В какой-то момент начинаешь понимать, что более 95% стандартных классов в Java вообще не используются при разработке продукта.
Коллекции, примитивы, перемещение байтов из одного места в другое, hibernate, Spring или EJB, еще какая-нибудь библиотека и, вуаля, приложение готово.
Чтобы как-то систематизировать знания и облегчить понимание темы, ниже представлен обзор классов для работы с многопоточностью.
Пишу в первую очередь как шпаргалку для себя.
И если это сработает для кого-то еще, это здорово.
Для грунтования Сразу дам пару интересных ссылок.
Первый — для тех, кто немного владеет многопоточностью.
Второй для «продвинутых» программистов — возможно, вы найдете здесь что-то полезное.
Немного об авторе пакета java.util.concurrent Если кто-нибудь когда-либо открывал исходный код классов java.util.concurrent, он не мог не заметить, что авторами был Дуг Ли, профессор Освего в Университете штата Нью-Йорк.
В список его самых известных разработок входит Java коллекции И util.concurrent , которые в той или иной форме отражены в существующих JDK. Он также написал dlmalloc реализация динамического распределения памяти.
Среди литературы отмечена книга о многопоточности Параллельное программирование на Java: принципы и шаблоны проектирования, 2-е издание .
Более подробную информацию можно найти у него домашняя страница .
Дуг Ли выступает на Языковой саммит JVM в 2010 году.
По верху У многих людей, вероятно, возникло ощущение некоторого хаоса, когда они бегло взглянули на java.util.concurrent. В одном пакете содержатся разные классы с совершенно разным функционалом, что несколько затрудняет понимание того, что к чему относится и как это работает. Поэтому можно схематично разделить классы и интерфейсы по функционалу, а затем заняться реализацией конкретных частей.
Параллельные коллекции — набор коллекций, которые более эффективно работают в многопоточной среде, чем стандартные универсальные коллекции из пакета java.util. Вместо базовой обёртки Collections.synchronizedList с блокировкой доступа ко всей коллекции используются блокировки на сегментах данных или оптимизируется работа для параллельного чтения данных по всей коллекции.
без ожидания алгоритмы.
Очереди - неблокирующие и блокирующие очереди с поддержкой многопоточности.
Неблокирующие очереди предназначены для обеспечения скорости и работы без блокировки потоков.
Блокирующие очереди используются, когда нужно «замедлить» потоки «Производитель» или «Потребитель», если не выполняются какие-то условия, например, очередь пуста или переполнена, или нет свободного «Потребителя».
Синхронизаторы — вспомогательные утилиты для синхронизации потоков.
Они являются мощным оружием в «параллельных» вычислениях.
Исполнители — содержит отличные фреймворки для создания пулов потоков, планирования асинхронных задач и получения результатов.
Замки — представляет собой альтернативные и более гибкие механизмы синхронизации потоков по сравнению с базовыми синхронизированными, ожидающими, уведомляющими, уведомляющимиВсе.
Атомика — классы с поддержкой атомарных операций над примитивами и ссылками.
1. Параллельные коллекции
Коллекции CopyOnWrite
Имя говорит само за себя.
Все операции, изменяющие коллекцию (добавление, установка, удаление), приводят к созданию новой копии внутреннего массива.
Это гарантирует, что при обходе коллекции итератором не будет выброшено исключение ConcurrentModificationException. Следует помнить, что при копировании массива копируются только ссылки (ссылки) на объекты (shallow copy), в т.ч.
доступ к полям элементов не является потокобезопасным.
Коллекции CopyOnWrite удобно использовать, когда операции записи происходят довольно редко, например, при реализации механизма подписки на слушателей и прохождения через них.
КопироватьонзаписьArrayList
— Потокобезопасный аналог ArrayList, реализованный с помощью алгоритма CopyOnWrite. Дополнительные методы и конструктор
CopyOnWriteArrayList(E[] toCopyIn) | Конструктор, который принимает на вход массив.
|
int indexOf(E e, int index) | Возвращает индекс первого найденного элемента, начиная поиск с данного индекса.
|
intlastIndexOf(E e, int index) | Возвращает индекс первого элемента, найденного при обратном поиске, начиная с заданного индекса.
|
логическое значение addIfAbsent(E e) | Добавьте элемент, если его нет в коллекции.
Метод равенства используется для сравнения элементов.
|
int addAllAbsent(Коллекция<? extends E> в) | Добавляйте элементы, если их нет в коллекции.
Возвращает количество добавленных элементов.
|
— Реализация интерфейса Set на основе CopyOnWriteArrayList. В отличие от CopyOnWriteArrayList, здесь нет дополнительных методов.
Масштабируемые карты
Улучшенные реализации HashMap и TreeMap с лучшей поддержкой многопоточности и масштабируемости.
Конкурентная карта
— Интерфейс, расширяющий Map несколькими дополнительными атомарными операциями.
Дополнительные методы
V putIfAbsent (ключ K, значение V) | Добавляет новую пару ключ-значение, только если ключ отсутствует в коллекции.
Возвращает предыдущее значение для данного ключа.
|
логическое удаление (ключ объекта, значение объекта) | Удаляет пару ключ-значение, только если данный ключ соответствует заданному значению на карте.
Возвращает true, если элемент был успешно удален.
|
логическая замена (ключ K, V oldValue, V newValue) | Заменяет старое значение новым по ключу только в том случае, если старое значение соответствует указанному значению на карте.
Возвращает true, если значение было заменено новым.
|
Замена V (ключ K, значение V) | Заменяет старое значение ключа новым только в том случае, если ключ связан с каким-либо значением.
Возвращает предыдущее значение для данного ключа.
|
— В отличие от Hashtable и синхронизированных блоков в HashMap, данные представлены в виде сегментов, разделенных хэшами ключей.
В результате доступ к данным осуществляется по сегментам, а не по одному объекту.
Кроме того, итераторы представляют данные за определенный период времени и не вызывают ConcurrentModificationException. Подробнее ConcurrentHashMap описан в хабратопике здесь .
Дополнительный конструктор
ConcurrentHashMap (int InitialCapacity, float loadFactor, int concurrencyLevel) | Третий параметр конструктора — ожидаемое количество одновременно записывающих потоков.
Значение по умолчанию — 16. Влияет на размер коллекции в памяти и производительность.
|
— Расширяет интерфейс NavigableMap и принудительно использует объекты ConcurrentNavigableMap в качестве возвращаемых значений.
Все итераторы объявлены безопасными для использования и не вызывают ConcurrentModificationException. ConcurrentSkipListMap
— Это аналог TreeMap с поддержкой многопоточности.
Данные также сортируются по ключу, и средняя производительность log(N) гарантируется для операций containsKey, get, put, delete и других подобных операций.
Алгоритм работы SkipList описан на странице Вики И хабре .
Конкурентскиплистсет
— Реализация интерфейса Set на основе ConcurrentSkipListMap.
2. Очереди
Неблокирующие очереди
Потокобезопасные и неблокирующие реализации очередей на связанных узлах.
ConcurrentLinkedQueue
— В реализации используется алгоритм wait-free от Michael & Scott, адаптированный для работы со сборщиком мусора.
Этот алгоритм достаточно эффективен и, что самое главное, очень быстр, потому что… построен на КАС .
Метод size() может занять много времени, в т.ч.
Лучше не тянуть его постоянно.
Подробное описание алгоритма можно найти здесь.
здесь .
Конкурентлинкеддек
- Deque означает «Двойная очередь» и читается как «Колода».
Это означает, что данные можно добавлять и извлекать с обеих сторон.
Соответственно, класс поддерживает оба режима работы: FIFO (First In First Out) и LIFO (Last In First Out).
На практике ConcurrentLinkedDeque следует использовать только в том случае, если LIFO абсолютно необходим, т.к.
из-за двунаправленности узлов этот класс теряет производительность на 40% по сравнению с ConcurrentLinkedQueue.
Блокировка очередей
Блокирующая очередь
— При обработке больших потоков данных через очереди использования ConcurrentLinkedQueue явно недостаточно.
Если потоки, разгребающие очередь, уже не справляются с наплывом данных, то можно быстро исчерпать память или перегрузить IO/Net настолько, что производительность существенно упадет, пока система не выйдет из строя из-за таймаутов или нехватки свободных дескрипторы в системе.
Для таких случаев нужна очередь с возможностью установки размера очереди или с условной блокировкой.
Именно здесь на помощь приходит интерфейс BlockingQueue, открывающий путь к целому набору полезных классов.
Помимо возможности задавать размер очереди, добавлены новые методы, которые по-разному реагируют на то, пуста или переполнена очередь.
Так, например, при добавлении элемента в переполненную очередь один метод выдаст исключение IllegalStateException, другой вернет false, третий заблокирует поток до появления пробела, а четвертый заблокирует поток с таймаутом и вернет false. если пространство никогда не появляется.
Также стоит отметить, что блокирующие очереди не поддерживают нулевые значения, поскольку это значение используется в методе опроса как индикатор таймаута.
МассивБлокированиеОчередь
— Класс блокирующей очереди, построенный на основе классического кольцевого буфера.
Помимо размера очереди доступна возможность контроля «справедливости» блокировок.
Если fair=false (по умолчанию), порядок потоков не гарантируется.
Более подробную информацию о «честности» можно найти в описании ReentrantLock. DelayQueue
— Достаточно специфический класс, позволяющий вытягивать элементы из очереди только после определенной задержки, определенной в каждом элементе через метод getDelay интерфейса Delayed. LinkedBlockingQueue
— Очередь блокировки на подключенных узлах, реализованная по алгоритму «очереди двух блокировок»: одна блокировка на добавление, другая на удаление элемента.
За счет двух блокировок, по сравнению с ArrayBlockingQueue, этот класс показывает более высокую производительность, но и потребление памяти у него выше.
Размер очереди задается через конструктор и по умолчанию равен Integer.MAX_VALUE. ПриоритетБлокировкаОчередь
— Является многопоточной оберткой над PriorityQueue. Когда элемент вставляется в очередь, его порядок определяется в соответствии с логикой Компаратора или реализацией интерфейса Comparable для элементов.
Первым из очереди выходит наименьший элемент. Синхронная очередь
— Эта очередь работает по принципу «один вошел — один вышел».
Каждая операция вставки блокирует поток «Производитель» до тех пор, пока поток «Потребитель» не вытащит элемент из очереди, и наоборот, «Потребитель» будет ждать, пока «Производитель» не вставит элемент. БлокированиеDeque
— Интерфейс, описывающий дополнительные методы для двунаправленной очереди блокировки.
Данные можно вставлять и извлекать с обеих сторон очереди.
LinkedBlockingDeque
— Двунаправленная блокирующая очередь на подключенных узлах, реализованная в виде простого двунаправленного списка с одной блокировкой.
Размер очереди задается через конструктор и по умолчанию равен Integer.MAX_VALUE. TransferQueue
— Этот интерфейс может быть интересен тем, что при добавлении элемента в очередь можно заблокировать вставляющий поток «Производитель» до тех пор, пока другой поток «Потребитель» не вытащит элемент из очереди.
Блокировка может быть как с таймаутом, так и полностью заменяться проверкой наличия ожидающих «Потребителей».
Это позволяет реализовать механизм передачи сообщений, поддерживающий как синхронные, так и асинхронные сообщения.
LinkedTransferQueue
— Реализация TransferQueue на основе алгоритма Dual Queues в Slack. Активно использует КАС И стоянка потоки, когда они находятся в режиме ожидания.
3. Синхронизаторы
В этом разделе представлены классы для активного управления потоками.
Семафор
— Семафоры чаще всего используется для ограничения количества потоков при работе с аппаратными ресурсами или файловой системой.
Доступ к общему ресурсу контролируется с помощью счетчика.
Если оно больше нуля, то доступ разрешается и счетчик уменьшается.
Если счетчик равен нулю, текущий поток блокируется до тех пор, пока другой поток не освободит ресурс.
Количество разрешений и «справедливость» освобождения потоков задается через конструктор.
Узким местом при использовании семафоров является установка количества разрешений, т.к.
зачастую это количество приходится выбирать в зависимости от мощности железа.
Обратный отсчетЗащелка
— Позволяет одному или нескольким потокам ждать, пока не завершится указанное количество операций, выполняемых в других потоках.
Классический пример драйвера достаточно хорошо описывает логику класса: потоки, вызывающие драйвер, будут зависать в методе await (с таймаутом или без него), пока поток с драйвером не инициализируется, а затем не вызовет метод countDown. Этот метод уменьшает счетчик обратного отсчета на единицу.
Как только счетчик достигнет нуля, все ожидающие потоки продолжат свою работу, и все последующие вызовы await будут выполняться без ожидания.
Счетчик обратного отсчета предназначен для одноразового использования и не может быть сброшен в исходное состояние.
ЦиклическийБарьер
— Может использоваться для синхронизации указанного количества потоков в одной точке.
Барьер достигается, когда N потоков вызывают await(.
) и блокируются.
После чего счетчик сбрасывается в исходное значение, а ожидающие потоки освобождаются.
Дополнительно при необходимости можно запустить специальный код до разблокировки потоков и сброса счетчика.
Для этого через конструктор передается объект с реализацией интерфейса Runnable. Обменник
— Как следует из названия, основная цель этого класса — обмен объектами между двумя потоками.
При этом поддерживаются и нулевые значения, что позволяет использовать этот класс для передачи только одного объекта или просто в качестве синхронизатора для двух потоков.
Первый поток, вызывающий метод обмена(.
), будет блокироваться до тех пор, пока второй поток не вызовет тот же метод. Как только это произойдет, потоки обменяются значениями и продолжат свою работу.
Фазер
— Улучшенная реализация барьера синхронизации потоков, сочетающая в себе функционал CyclicBarrier и CountDownLatch, вобравшая в себя лучшее из них.
Таким образом, количество потоков строго не определено и может изменяться динамически.
Класс можно использовать повторно и указать, что поток готов, не блокируя его.
Подробнее можно прочитать в хабратопике здесь .
4.Исполнители Теперь мы подошли к самой большой части пакета.
Здесь мы опишем интерфейсы для запуска асинхронных задач с возможностью получения результатов через интерфейсы Future и Callable, а также сервисы и фабрики для создания пулов потоков: ThreadPoolExecutor, ScheduledPoolExecutor, ForkJoinPool. Для лучшего понимания проведем небольшую декомпозицию интерфейсов и классов.
Будущее и вызываемое
Будущее
— Замечательный интерфейс для получения результатов асинхронной операции.
Ключевым методом здесь является метод get, который блокирует текущий поток (с таймаутом или без него) до завершения асинхронной операции в другом потоке.
Также существуют дополнительные методы отмены операции и проверки текущего статуса.
В качестве реализации часто используется класс FutureTask. RunnableFuture
— Если Future — это интерфейс для Client API, то интерфейс RunnableFuture уже используется для запуска асинхронной части.
Успешное завершение метода run() завершает асинхронную операцию и позволяет получить результаты с помощью метода get. возможность вызова
— Расширенный аналог интерфейса Runnable для асинхронных операций.
Позволяет вернуть типизированное значение и выдать проверенное исключение.
Хотя в этом интерфейсе отсутствует метод run(), многие классы java.util.concurrent поддерживают его вместе с Runnable. Будущаязадача
Теги: #java #параллелизм #руководство #программирование #java
-
Дорогие Курсы: Стоит Ли Оно Того?
19 Oct, 24 -
Сборник Упражнений Typescript
19 Oct, 24 -
На Одну Виртуальную Сеть Станет Меньше!
19 Oct, 24 -
Пакман Навсегда
19 Oct, 24 -
Respondu.net Угадывает Мысли
19 Oct, 24 -
Как Фанаты Аниме Добывают Криптовалюту
19 Oct, 24