Paralelnà programovánà v Javě a java.util.concurrent
Výpisky ze stejnojmenné přednášky Petra Nejedlého z firmy Sun
1.Memory model
Definuje sémantiku práce s pamÄ›tÃ.
i)Strong
Intuitivnà a pÅ™irozený pro programátora zvyklého na jednovláknové aplikace, vÅ¡echny zápisy do pamÄ›ti jsou viditelné v poÅ™adÃ, v jakém probÄ›hly. PÅ™erovnánà instrukci kompilátorem nemajà vliv na sémantiku programu, pro vÃcevláknové aplikace je ale velmi nevýhodný, protože výraznÄ› omezuje optimalizaÄnà možnosti procesoru nebo JVM a tÃm i výkon.
ii)Weak
VyužÃvá ho Java, od verze 5 je revidovaný. V pÅ™ÃpadÄ› jednovláknového programu se chová jako strong. Pro vÃce vláken je situace složitÄ›jÅ¡Ã, napÅ™. se může stát, že zápis do promÄ›nných A a B jednÃm threadem je v druhém threadu vidÄ›n v opaÄném poÅ™adà nebo dokonce vůbec. Proto jsou k dispozici nástroje pro zajiÅ¡tÄ›nà konzistence (bariéry, kritické sekce - synchronized bloky, atomické instrukce ).
2.Nástroje Javy
i)Synchronizace
Každý objekt v Javě má se sebou svázaný monitor. Ten má tři funkce:
-
Zámek - zajišťuje exkluzivitu pÅ™Ãstupu.
-
Wait condition - jde o Äekánà wait() (výstup z wait je opÄ›t read-barrier) a notifikace threadů notify() a notifyAll().
-
Memory barrier. - pÅ™i použità synchronized bloku se pÅ™ed nÄ›j do bytekódu vkládá instrukce monitorenter (chová se jako read-barrier, což znamená, že po této instrukci veÅ¡kerá Ätenà z pamÄ›ti musà ÄÃst aktuálnà hodnoty) a za nÄ›j monitorexit (chová se jako write-barrier, což znamená, že veÅ¡keré v kritické sekci do pamÄ›ti zapsané hodnoty budou od teÄ viditelné ostatnÃm threadům).
Synchronizace obvykle staÄà pro vÄ›tÅ¡inu použitÃ.
ii)Volatile a final proměnné
Volatile proměnné majà několik vlastnosti:
-
Kompilátor je nesmà držet v registru, musà je vždy propsat do hlavnà paměti.
-
ÄŒtenà volatile fieldu implikuje provedenà read-barrier pÅ™ed tÃmto ÄtenÃm, zápis do volatile fieldu implikuje provedenà write-barrier po tomto zápisu.
-
Kompilátor sice může pÅ™erovnávat pÅ™Ãstupy k pamÄ›ti, nicménÄ› toto nesmà provést pÅ™es hranici pÅ™Ãstupu k volatile fieldu (to je novinka od Javy 5).
-
Čtenà a zápis volatile fieldu je vždy atomický (i pro long a double).
-
Tvořà happens before hrany. Pokud zapÃÅ¡u do volatile fieldu v jednom threadu a z druhého threadu pÅ™eÄtu již tuto novou hodnotu, pak vÅ¡e co se stalo v prvnÃm threadu pÅ™ed tÃmto zápisem se urÄitÄ› stalo pÅ™ed ÄtenÃm z druhého threadu a je tedy z tohoto druhého threadu po tomto Ätenà viditelné.
Final proměnné
-
Lze je ÄÃst bez synchronizace z jakéhokoliv threadu a je zaruÄen zisk správné hodnoty (pÅ™ed tÃm ovÅ¡em musà dobÄ›hnout konstruktor objektu s daným final fieldem, aniž by dal nÄ›komu referenci sám na sebe, tzn. reference na this nesmà opustit konstruktor. Toho nejsnáze dosáhneme vytvoÅ™enÃm Factory a jednoduchým konstruktorem).
-
Immutable objekty majà mnoho dalÅ¡Ãch výhod. Mohu s nimi pracovat z libovolného threadu bez synchronizace, lze je znovu použÃt.
iii)Double-checking pattern
PoužÃvá se pÅ™i lazy inicializaci singletonu, v JavÄ› 4 je vždy rozbitý, za nepÅ™Ãznivých okolnostà mohou vzniknout dvÄ› instance, a nebo hůře, vrácená instance může být ne zcela inicializovaná.
01 Class A {
02 static A inst;
03 public static A getInst() {
04 if (inst == null) {
05 synchronized (A.class) {
06 if (inst == null) {
07 inst = new A();
08 }
09 }
10 }
11 return inst;
12 }
13 }
Prvnà myÅ¡lený průbÄ›h: prvnà vlákno na řádku 4 zjistÃ, že instance neexistuje, vstoupà tedy do kritické sekce (druhé vlákno Äeká) a na řádku 6 znovu zkontroluje, zda mezitÃm nÄ›kdo instanci nevyrobil, to se nestalo, vytvořà tedy instanci a opustà kritickou sekci. Druhé vlákno vstoupà do kritické sekce, zjistà že instance už existuje, takže vystoupà ven a vrátà instanci. To se zdá býti v pořádku.
Druhý myÅ¡lený průbÄ›h: prvnà vlákno se dostane až na řádek 7 a zaÄne provádÄ›t výraz „new A()“. Kompilátor ale provede optimalizaci, zinlinuje konstruktor, pÅ™erovná fieldy, pak nejprve alokuje paměť pro objekt a referenci na nÄ›j propÃÅ¡e do pamÄ›ti (to může udÄ›lat, urÄitÄ› to musà udÄ›lat pÅ™i opuÅ¡tÄ›nà kritické sekce, ale nikdo mu nezakazuje udÄ›lat to dÅ™Ãve) a teprve pak zaÄne provádÄ›t konstruktor, který inicializuje dalšà fieldy v singletonu. V tomto okamžiku se pÅ™epne kontext na druhé vlákno a to na řádku 4 zjistÃ, že instance již existuje a vrátà ji, ta ale nebude zcela zkonstruovaná!!!
V JavÄ› 5 je možno kód snadno opravit. StaÄà field inst udÄ›lat volatile. TÃm je zakázáno pÅ™erovnávánà a bude to fungovat. LepÅ¡Ãm způsobem je ale použÃt statický inicializátor. U nÄ›j je zaruÄeno že se volá právÄ› jednou a vÄ›tÅ¡inou až když o tÅ™Ãdu nÄ›kdo „zavadÓ, JVM inteligentnÄ› odkládá inicializaci.
01 Class A {
02 static A inst;
03 static {
04 inst = new A();
05 }
06 public static getA() {
07 return inst;
08 }
09 }
3.Java.util.concurrent
i)Atomic
BalÃk obsahuje 9 tÅ™Ãd {int, long, ref} x {value, array, field}, tj. AtomicInteger, AtomicIntegerArray, AtomicIntegerFieldUpdater a to samé pro typ long a referenci. Dále 3 tÅ™Ãdy AtomicBoolean, AtomicMarkableReference a AtomicStampedReference.
- AtomicXXX tÅ™Ãdy podporujà atomické operacà jako napÅ™. CompareAndSet. PÅ™Ãkladem využità může být kód generujÃcà po sobÄ› jdoucÃch id. Naivnà implementace pomocà obalenà Longu a generovánà id v synchronized bloku je správná, ale nenà škálovatelná, neboÅ¥ kritická sekce se v pÅ™ÃpadÄ› mnoha vláken žádajÃcÃch id stane úzkým hrdlem celého programu. Implementace pomocà AtomicLong:
01 AtomicLong v; 02 long nextValue() { 03 for (;;) { 04 long x = v.get(); 05 long n = x + 1; 06 if (v.compareAndSet(x,n)) returnn; 07 } 08 }Na řádku 4 zÃskám poslednà id, na řádku 5 si spoÄÃtám, jak by mÄ›lo vypadat následujÃcà id. Volánà na řádku 6 může dopadnout úspěšnÄ› (compareAndSet vracà true), což znamená že v okamžiku volánà atomické instrukce mi žádný jiný thread id nezvýšil, hodnota v je zvýšena a vracÃm ji jako nové id. Pokud ale volánà dopadne neúspěšnÄ›, znamená to, že mezi řádkem 4 a 6 jiný thread již id zvýšil, instrukce compareAndSet hodnotu v nezmÄ›nila a opakuji pokus o zÃskánà id.
Tato implementace je mnohem efektivnÄ›jÅ¡Ã. Proto existuje pÅ™Ãmo metoda getAndIncrement(), která dÄ›lá nÄ›co velmi podobného. -
AtomicXXXArray zajišťuje pole, jehož položky (nikoliv celé pole) lze atomicky modifikovat. Plnà stejnou funkci, jakou by plnilo pole AtomicXXX objektů, za nižšà spotÅ™eby pamÄ›ti (AtomicXXXArray jsou dva objekty - pole a obal, zatÃmco pole AtomicXXX objektů by bylo mnoho, dle poÄtu položek pole).
-
AtomicXXXFieldUpdater umožňuje atomicky modifikovat member fieldy tÅ™Ãdy (musà být volatile a musà se modifikovat vždy pÅ™es tento updater, nikdy pÅ™Ãmo) bez overheadu, který by vznikl, kdybych vÅ¡echny tyto fieldy deklaroval jako AtomicXXX. Je to tedy podobné jako v pÅ™ÃpadÄ› AtomicXXXArray, ale jde o fieldy.
-
Protože nejsem schopen atomicky modifikovat dva fieldy najednou (v jedné atomické operaci), což je pro nÄ›které algoritmy důležité, existujà AtomicMarkableReference (pár reference a boolean) a AtomicStampedReference (pár reference a integer), které toto umožňujÃ. ZatÃm to v JVM nenà pÅ™ÃliÅ¡ efektivnÄ› implementováno (jde o obalenà vnitÅ™nà immutable tÅ™Ãdy, která se nahrazuje jako celek), v budoucnu se to ale může zmÄ›nit.
ii)Locks
Interface Lock nabÃzà mocnÄ›jšà sémantiku pro zámky než klasické synchronized bloky. RozdÃly:
-
Nový zámek je objekt s metodami acquire() a release(), takže ho nejsme nuceni zÃskat a uvolnit v rámci jedné metody. Je samozÅ™ejmÄ› tÅ™eba dát pozor na korektnà uvolnÄ›nà pÅ™i výjimkách apod.
-
Podpora tzv. hand over hand algoritmů. NapÅ™Ãklad pÅ™i práci s double linked list mohu zamknout prvnà a druhou položku, pak druhý odemknout a tÅ™età zamknout atd. Toto opÄ›t nenà s klasickým synchronized blokem možné.
-
Je možno Äekat na vÃce různých podmÃnek vytvoÅ™enÃm objektů Condition (klasický synchronized blok má jedinou podmÃnku, na kterou Äekajà vÅ¡echny thready).
-
Thread ÄekajÃcà na zámek pÅ™i použità klasického synchronized bloku nelze nijak probudit, ani mu Å™Ãct aby to vzdal. Nový zámek má metody tryAcquire(), kterou lze specifikovat timeout, a také pÅ™eruÅ¡itelnost pomocà Thread.interrupt(), kdy toto volánà ÄekajÃcà thread probudà a vyhodà InterruptedException.
-
Možnost udÄ›lat „spravedlivé“ zámky. ÄŒekajÃcÃm threadům může být zámek pÅ™idÄ›lován v poÅ™adÃ, v nÄ›mž o nÄ›j požádali.
-
Klasické synchronized bloky jsou reentrantnà zámky. To znamená, že thread, který vlastnà zámek může do bloku vstupovat znovu. Nové zámky mohou být reentrantnà nebo nereentrantnÃ. V pÅ™ÃpadÄ› nereentrantnÃho zámku thread deadlockne sám sebe, pokud se pokusà opakovanÄ› vstoupit do chránÄ›ného bloku.
iii)Synchronizers
Implementace známých synchronizaÄnÃch primitiv:
-
TÅ™Ãda Semaphore reprezentuje klasický Dijkstrův semafor s parametrem (poÄtem slotů), kolik threadů může být v jednu chvÃli v kritické sekci. Jakmile thread uvolnà slot, může ho zÃskat nÄ›kdo jiný. PoÄtem slotů jedna degeneruje semafor na nereentrantnà mutex.
-
TÅ™Ãda CountDownLatch je podobná semaforu, ale uvolnÄ›ný slot již nikdo nezÃská, thready mohou pomocà await() Äekat, než se vyÄerpajà vÅ¡echny sloty (tj. než poÄÃtadlo klesne na nulu). Potom již zůstává zámek odemÄen a dalšà volánà await() vždy projde bez ÄekánÃ.
-
TÅ™Ãda CyclicBarrier je jakési rendez vous n threadů. Každý thread může požádat o bariéru a tÃm je zablokován, než se vÅ¡echny thready „sejdou“ na této bariéře. Jakmile se tak stane, máme volitelnÄ› k dispozici Runnable pro provedenà nÄ›jaké akce a až dobÄ›hne, jsou vÅ¡echny thready odblokovány a mohou pokraÄovat.
-
TÅ™Ãda Exchanger umožňuje dvÄ›ma threadům si v urÄitém okamžiku vymÄ›nit data (systém se postará o pÅ™ÃsluÅ¡ná propsánà do pamÄ›ti atd.) a pokraÄovat.
iv)Concurrent collections
Kolekce ConcurrentHashMap - od synchronizované se liÅ¡Ã:
-
Nejen že je thread-safe, ale je i zároveň neblokujÃcÃ. Umožňuje konkurenÄnà Ätenà i zápisy a nenà tedy úzkým hrdlem v pÅ™ÃpadÄ› paralelnÃho pÅ™Ãstupu mnoha threadů.
-
Lze specifikovat concurrency level, což je stupeň vnitÅ™nà segmentace. Vyššà stupeň znamená jemnÄ›jšà dÄ›lenÃ, tÃm vyššà paralelismus operacà (i zápisů), ale i vyššà overhead.
-
Iterátory nejsou statické (nejsou snapshot) a nikdy nevyhazujà výjimku ConcurrentModificationException. Vždy reprezentujà konzistentnà (nÄ›jakým způsobem paralelnÃm výpoÄtem dosažitelný) pohled na položky v kolekci, takže nikdy neukazuje kolekci ve stavu, v kterém v žádném Äase nebyla (nemohla být). NapÅ™Ãklad provádÃ-li jeden thread postupnÄ› +A, +B, +C, -B, +D a druhý thread tuto kolekci iteruje, pak nikdy neuvidà A, B, C, D. Ale může napÅ™. vidÄ›t jen A. Nebo A, B. Nebo A, B, C. Nebo A, C, D.
Kolekce CopyOnWriteArrayList pÅ™i modifikaci zkopÃruje vÅ¡e do nového pole a staré visà na heapu, než je odstranÄ›no garbage collectorem. Vhodné pro pole, které se Äasto Äte a zÅ™Ãdka modifikuje. Iterátor je snapshot stavu kolekce v okamžiku jeho zÃskánÃ, takže také nikdy nevyhazuje výše popsanou výjimku.
Fronta ConcurrentLinkedQueue může mÃt vÃce producentů i konzumentů. Je opÄ›t nejen thread-safe, ale i paralelnÃ.
Fronty XXXBlockingQueue umožňujà Äekat na položku (i napÅ™. s timeoutem). Je nÄ›kolik implementacà – LinkedBlockingQueue (neomezená délka), PriorityBlockingQueue (pÅ™erovnává položky dle priority), ArrayBlockingQueue (pevná délka, o to efektivnÄ›jÅ¡Ã), SynchronousQueue (fronta nulové délky, nikdy neobsahuje žádnou položku, k výmÄ›nÄ› jedné položky se musà sejÃt producent i konzument, pÅ™ijde-li dÅ™Ãve producent, Äeká na konzumenta a naopak), DelayedQueue (položky pÅ™erovnává podle jejich timeoutu, ven je pustà až timeout dobÄ›hne, metoda size() sice může vrátit nÄ›jaké ÄÃslo, jakožto poÄet položek, ale neznamená to že už vytimeoutovali a jsou tedy k dispozici).
v)Executors
Executor je služba, která vykoná pÅ™edaný úkol (Runnable). Typicky v jiném threadu, napÅ™. v AWT threadu nebo v thread poolu dané velikosti (i s frontou, může být omezené délky a pÅ™i naplnÄ›nà volat definovaný callback, jenž může napÅ™. zahodit nejnovÄ›jÅ¡Ã/ nejstaršà požadavek nebo vykonat požadavek v threadu, kterým byl zaslán a tÃm ho „zbrzdit“).
ExecutorService má jeÅ¡tÄ› mocnÄ›jšà sémantiku než Executor. Umà napÅ™. pÅ™estat pÅ™ijÃmat úkoly apod.
ScheduledExecuterService vykonává odložené a periodické úkoly.
Callable je jako Runnable, ale po pÅ™edánà do ExecutorService vracà jako výsledek Future, pomocà nÄ›jž mohu úkol zruÅ¡it nebo si poÄkat na výsledek (blokujÃce i neblokujÃce).
Do ExecutorCompletionService pÅ™edáváme Callables a zÃskáváme Futures, ale tak, že až majà výsledek, tak se tyto ukládajà do fronty, odkud je můžeme zÃskávat.
2007-05-29 at 12.29 pm
dobry den,
mohl byste prosim zmenit odkaz z
http://avc.sh.cvut.cz/archiv/index.php?id=1041&rid=436&offset=0&select=NetBeans
na odkaz z novych stranek http://avc-cvut.cz ?
bohuzel jsme kvuli politickemu natlaku museli klub sh opustit a zalozili jsme obc. sdruzeni AVC studentu CVUT a na tvorbe videi pokracujeme tam…
S pozdravem
Radek Novotny
Audiovizualni centrum studentu CVUT
radek.novotny@avc-cvut.cz
2007-05-30 at 10.29 pm
Link opraven