Скачать презентацию
Идет загрузка презентации. Пожалуйста, подождите
Презентация была опубликована 10 лет назад пользователемАнна Михалина
1 МНОГОПОТОЧНОЕ ПРОГРАММИРОВАНИЕ В JAVA Пакеты java.lang java.util.concurrent
2 CONCURRENCY UTILITIES Набор классов, облегчающих написание многопоточных программ Пакет java.util.concurrent.locks Работа с блокировками Пакет java.util.concurrent.atomic Атомарные переменные Пакет java.util.concurrent Примитивы синхронизации Многопоточные коллекции Управление заданиями Java Advanced / java.util.concurrent
3 ЧАСТЬ 1 БЛОКИРОВКИ И УСЛОВИЯ
4 БЛОКИРОВКА Только один поток может владеть блокировкой Операции lockполучить блокировку unlockотдать блокировку tryLockпопробовать получить блокировку Java Advanced / java.util.concurrent
5 БЛОКИРОВКИ В JAVA Интерфейс Lock Методы lock() – захватить блокировку lockInterruptibly() – захватить блокировку tryLock(time?) – попытаться захватить блокировку unlock() – отпустить блокировку Передача событий newCondition() – создать условие Java Advanced / java.util.concurrent
6 УСЛОВИЯ Интерфейс Condition await(time?) – ждать условия awaitUntil(deadline) – ждать условия до времени awaitUninterruptibly() – ждать условие signal() – подать сигнал signalAll() – подать сигнал всем Нужно владеть родительской блокировкой Java Advanced / java.util.concurrent
7 ПРОИЗВОДИТЕЛЬ Решение с помощью событий void set(Object data) throws InterruptedException { lock.lock(); try { while (data != null) notFull.await(); this.data = data; notEmpty.signal(); } finally { lock.unlock(); } Java Advanced / java.util.concurrent
8 ОСОБЕННОСТИ Отсутствие «блочности» Разделенные блокировки Необходимость явного отпускания Идиома l.lock() try { … } finally { l.unlock() } Java Advanced / java.util.concurrent
9 РЕАЛИЗАЦИЯ БЛОКИРОВКИ Класс ReentrantLock Дополнительные методы isFair() – «честность» блокировки isLocked() – блокировка занята Статистика getQueuedThreads() / getQueueLength() / hasQueuedThread(thread) / hasQueuedThreads() – потоки, ждущие блокировку getWaitingThreads(condition) / getWaitQueueLength(condition) – потоки, ждущие условие Java Advanced / java.util.concurrent
10 BOUNDED BUFFER class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { … } public Object take() throws InterruptedException { … } Java Advanced / java.util.concurrent
11 BOUNDED BUFFER public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } Java Advanced / java.util.concurrent
12 BOUNDED BUFFER public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } Java Advanced / java.util.concurrent
13 ЗАДАЧА О ЧИТАТЕЛЯХ И ПИСАТЕЛЯХ Читать могут много потоков одновременно Писать может только один поток Читать во время записи нельзя Java Advanced / java.util.concurrent
14 ЧИТАТЕЛИ И ПИСАТЕЛИ Интерфейс ReadWriteLock Методы readLock() – блокировка для читателей writeLock() – блокировка для писателей Реализация ReentrantReadWriteLock Java Advanced / java.util.concurrent
15 ЧАСТЬ 2 УПРАВЛЕНИЕ ЗАДАНИЯМИ Java Advanced / java.util.concurrent
16 ИСПОЛНИТЕЛИ Интерфейс Executor execute(Runnable) – выполнить задание Возможные варианты выполнения В том же потоке Во вновь создаваемом потоке Пул потоков Наращиваемый пул потоков Java Advanced / java.util.concurrent
17 ФУНКЦИИ И РЕЗУЛЬТАТЫ Интерфейс Callable – функция V call() – подсчитать функцию Интерфейс Future – результат get(timeout?) – получить результат isDone() – окончено ли выполнение cancel(mayInterruptWhenRunning) – прервать выполнение isCancelled() – прервано ли выполнение Java Advanced / java.util.concurrent
18 ИСПОЛНИТЕЛИ-2 Интерфейс ExecutorService submit(Runnable) – выполнить задание Future submit(Callable ) – выполнить функцию List invokeAll(List ) – выполнить все функции Future invokeAny(List ) – успешно выполнить функцию Java Advanced / java.util.concurrent
19 ЗАВЕРШЕНИЕ РАБОТЫ shutdown() – прекратить прием заданий List shutdownNow() – прекратить выполнение isShutdown() – прекращен ли прием isTerminated() – окончен ли все задания awaitTermination(timeout) – ожидание завершения Java Advanced / java.util.concurrent
20 КЛАСС EXECUTORS Создание исполнителей newCachedThreadPool() newFixedThreadPool(n) newSingleThreadExecutor() Создание фабрик потоков Класс ThreadFactory Создание привилегированных действий и фабрик потоков Наследую права создавшего Java Advanced / java.util.concurrent
21 ПРИМЕР class Task implements Runnable { private int counter; public Task(int num) { this.counter = num; } public void run() { while (counter-- > 0) { System.out.println(Thread.currentThread() + ": " + counter); Thread.yield(); } public class Main { public static void main(String[] args) { Random rand = new Random(); ExecutorService exec = Executors.newFixedThreadPool(2); for (int i = 0; i < 5; i++) { exec.execute(new Task(Math.abs(rand.nextInt())%10)); } exec.shutdown(); } Java Advanced / java.util.concurrent
22 ПРИМЕР-2 class CallableTask implements Callable { private int counter; private final int number; public CallableTask(int num) { this.counter = num; this.number = num; } public Integer call() { while (counter-- > 0) { System.out.println(Thread.currentThread() + ": " + counter); Thread.yield(); } return number; } Java Advanced / java.util.concurrent
23 ПРИМЕР-2 public class MainCallable { public static void main(String[] args) { ArrayList > results = new ArrayList(); ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { results.add(exec.submit(new CallableTask(i))); } exec.shutdown(); for (Future fi : results) { try { System.out.println(fi.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } try { System.out.println(fi.get(5, TimeUnit.SECONDS)); } catch (InterruptedException | ExecutionException e | TimeoutException e) { e.printStackTrace(); } Java Advanced / java.util.concurrent
24 РЕАЛИЗАЦИЯ ИСПОЛНИТЕЛЕЙ Класс ThreadPoolExecutor corePoolSize – минимальное количество потоков maxPoolSize максимальное количество потоков blockingQueue – очередь заданий keepAliveTime – время жизни потока threadFactory – фабрика потоков … Java Advanced / java.util.concurrent
25 ПРИМЕР class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } public class MainThreadFactory { public static void main(String[] args) { ArrayList > results = new ArrayList >(); ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory()); for (int i = 0; i < 5; i++) { results.add(exec.submit(new CallableTask(i*100))); } exec.shutdown(); for (Future fi : results) { try { System.out.println(fi.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } Java Advanced / java.util.concurrent
26 ОТКЛОНЕНИЕ ЗАДАНИЙ Нет свободного потока и места в очереди Политики отклонения AbortPolicy – бросить RejectedExecutionException CallerRunsPolicy – исполнить в вызывающем потоке DiscardPolicy – проигноировать DiscardOldestPolicy – заменить самое давнее Интерфейс RejectedExecutionHandler Java Advanced / java.util.concurrent
27 ОТЛОЖЕННОЕ ИСПОЛНЕНИЕ Интерфейс ScheduledExecutorService schedule(callable, timeout) – исполнить через timeout schedule(runnable, timeout?) – исполнить через timeout sheduleAtFixedRate(runnable, initialDelay, period) – периодическое исполнение scheduleWithFixedDelay(runnable, initialDelay, delay) – исполнение с равными интервалами Все методы возвращают ScheduledFuture Java Advanced / java.util.concurrent
28 РЕАЛИЗАЦИЯ ОТЛОЖЕННОГО ИСПОЛНЕНИЯ Класс ScheduledThreadPoolExecutor Java Advanced / java.util.concurrent
29 ЧАСТЬ 3 ПРИМИТИВЫ СИНХРОНИЗАЦИИ Java Advanced / java.util.concurrent
30 ПРИМИТИВЫ СИНХРОНИЗАЦИИ Semaphore – семафор CyclicBarrier – многоразовый барьер CountDownLatch – защелка Exchanger – рандеву Java Advanced / java.util.concurrent
31 СЕМАФОР Хранит количество разрешений на вход Операции acquireполучить разрешение releaseдобавить разрешение Доступ к ограниченными ресурсами Java Advanced / java.util.concurrent
32 СЕМАФОРЫ В JAVA Конструкторы Semaphore(n, fair?) – число разрешений и честность Методы acquire(n?) – получить разрешение release(n?) – отдать разрешение tryAquire(n?, time?) – попробовать получить разрешение reducePermits(n) – уменьшить количество разрешений drainPermits() – забрать все разрешения статистика Java Advanced / java.util.concurrent
33 БАРЬЕР Потоки блокируются пока все потоки не прибудут к барьеру Одноразовый Многоразовый Операции arriveприбытие к барьеру Синхронизация потоков Переход к следующему этапу Java Advanced / java.util.concurrent
34 БАРЬЕРЫ В JAVA Конструкторы CyclicBarrier(n, runnable?) – число потоков и действие на барьере Методы await(time?) – барьер. reset() – возвращает барьер в исходное состояние isBroken() – «сломан» ли барьер статистика Java Advanced / java.util.concurrent
35 ПРИМЕР. СКАЧКИ class Horse implements Runnable { private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier b) { barrier = b; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { strides += rand.nextInt(3); // Produces 0, 1 or 2 } barrier.await(); } } catch(InterruptedException e) {// Приемлемый вариант выхода } catch(BrokenBarrierException e) {throw new RuntimeException(e);} } … } Java Advanced / java.util.concurrent
36 ПРИМЕР. СКАЧКИ public class HorseRace { private List horses = new ArrayList (); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace(int nHorses) { barrier = new CyclicBarrier(nHorses, new Runnable() { public void run() { for(Horse horse : horses) System.out.println(horse.tracks()); for(Horse horse : horses) if(horse.getStrides() >= FINISH_LINE) { System.out.println(horse + "won!"); exec.shutdownNow(); return; } }); for(int i = 0; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } Java Advanced / java.util.concurrent
37 МОНИТОР Разделяемые переменные инкапсулированы в мониторе Код в мониторе исполняется не более чем одним потоком Условия Операции с условиями waitожидание условия notifyсообщение об условии одному потоку notifyAllсообщение об условии всем потокам Java Advanced / java.util.concurrent
38 ЗАЩЕЛКИ Ожидание завершения нескольких работ Операции countDown() – опускает защелку на единицу await() – ждет спуска защелки Java Advanced / java.util.concurrent
39 ЗАЩЕЛКИ В JAVA Конструктор CountDownLatch(n) – высотазащелки Методы await(time?) – ждет спуска защелки countDown() – опускает защелку на единицу getCount() – текущая высота защелки Применение Инициализация Java Advanced / java.util.concurrent
40 ПРИМЕР // Часть основной задачи: class TaskPortion implements Runnable { … private final CountDownLatch latch; TaskPortion(CountDownLatch latch) { this.latch = latch; } public void run() { try { doWork(); latch.countDown(); } catch(InterruptedException ex) { // Приемлемый вариант выхода } // Ожидание по объекту CountDownLatch: class WaitingTask implements Runnable { … private final CountDownLatch latch; WaitingTask(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); doWrk(); System.out.println("barrier passed"); } catch(InterruptedException ex) { System.out.println(this + " interrupted"); } Java Advanced / java.util.concurrent
41 РАНДЕВУ Позволяет потокам синхронно обмениваться объектами Конструкторы Exchanger() Методы exchange(V x, time?) – обменяться Java Advanced / java.util.concurrent
42 ПРИМЕР class ExchangerProducer implements Runnable { private Generator generator; private Exchanger > exchanger; private List holder; ExchangerProducer(Exchanger > exchg, Generator gen, List holder) { exchanger = exchg; generator = gen; this.holder = holder; } public void run() { try { while(!Thread.interrupted()) { for(int i = 0; i < ExchangerDemo.size; i++) holder.add(generator.next()); holder = exchanger.exchange(holder); } } catch(InterruptedException e) {} } Java Advanced / java.util.concurrent
43 ПРИМЕР class ExchangerConsumer implements Runnable { private Exchanger > exchanger; private List holder; private volatile T value; ExchangerConsumer(Exchanger > ex, List holder){ exchanger = ex; this.holder = holder; } public void run() { try { while(!Thread.interrupted()) { holder = exchanger.exchange(holder); for(T x : holder) { value = x; // Выборка значения holder.remove(x); } } catch(InterruptedException e) {} System.out.println("Final value: " + value); } Java Advanced / java.util.concurrent
44 ПРИМЕР public class ExchangerDemo { static int size = 10; static int delay = 5; // Секунды public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); Exchanger > xc = new Exchanger >(); List producerList = new CopyOnWriteArrayList (), consumerList = new CopyOnWriteArrayList (); exec.execute(new ExchangerProducer (xc, BasicGenerator.create(Fat.class), producerList)); exec.execute( new ExchangerConsumer (xc,consumerList)); TimeUnit.SECONDS.sleep(delay); exec.shutdownNow(); } Java Advanced / java.util.concurrent
45 ЧАСТЬ 4 АТОМАРНЫЕ ОПЕРАЦИИ Java Advanced / java.util.concurrent
46 АТОМАРНАЯ ОПЕРАЦИЯ Операция выполняемая как единое целое Чтение Запись Неатомарные операции Инкремент Декремент Java Advanced / java.util.concurrent
47 ВИДЫ АТОМАРНЫХ ОПЕРАЦИЙ Чтение get Запись set Чтение и запись getAndSet Условная запись compareAndSet Java Advanced / java.util.concurrent
48 УСЛОВНАЯ ЗАПИСЬ compareAndSet(old, new) Если текущее значение равно old Установить значение в new Идиома do { old = v.get(); new = process(old); } while (v.compareAndSet(old, new)); Java Advanced / java.util.concurrent
49 РЕШЕНИЕ ЗАДАЧИ ДОСТУПА К РЕСУРСУ // Получение доступа к ресурсу while(!v.compareAndSet(0, 1)); // Действия с ресурсом // Освобождение ресурса v.set(0); Java Advanced / java.util.concurrent
50 НЕБЛОКИРУЮЩИЙ СЧЕТЧИК public final class Counter { private long value = 0; public synchronized long getValue() { return value; } public synchronized long increment() { return ++value; } public class NonblockingCounter { private AtomicInteger value; public int getValue() { return value.get(); } public int increment() { int v; do { v = value.get(); while (!value.compareAndSet(v, v + 1)); return v + 1; } Java Advanced / java.util.concurrent
51 АТОМАРНЫЕ ОПЕРАЦИИ В JAVA Чтение / запись get() – атомарное чтение set(value) – атомарная запись lazySet(value) – запись без барьера getAndSet(value) – чтение и запись Проверки compareAndSet(expected, value) – сравнение и запись weakCompareAndSet(expected, value) – слабое сравнение и запись Java Advanced / java.util.concurrent
52 ОПЕРАЦИИ НАД ЧИСЛАМИ Пре- операции getAndIncrement() – инкремент getAndDecrement() – декремент addAndGet() – сложение Пост- операции incrementAndGet() – инкремент decrementAndGet() – декремент getAndAdd() – сложение Java Advanced / java.util.concurrent
53 АТОМАРНЫЕ ПЕРЕМЕННЫЕ Типы AtomicBoolean AtomicInteger AtomicLong AtomicReference Операции Обычные Java Advanced / java.util.concurrent
54 АТОМАРНЫЕ МАССИВЫ Типы AtomicIntegerArray AtomicLongArray AtomicReferenceArray Операции Обычные, с указанием индекса length() – число элементов Java Advanced / java.util.concurrent
Еще похожие презентации в нашем архиве:
© 2024 MyShared Inc.
All rights reserved.