Paralelní programování v Javě a java.util.concurrent

Výpisky ze stejnojmenné přednášky Petra Nejedlého z firmy Sun

1.Memory model

Definuje sémantiku práce s pamětí.

i)Strong

Intuitivní a přirozený pro programátora zvyklého na jednovláknové aplikace, všechny zápisy do paměti jsou viditelné v pořadí, v jakém proběhly. Přerovnání instrukci kompilátorem nemají vliv na sémantiku programu, pro vícevláknové aplikace je ale velmi nevýhodný, protože výrazně omezuje optimalizační možnosti procesoru nebo JVM a tím i výkon.

ii)Weak

Využívá ho Java, od verze 5 je revidovaný. V případě jednovláknového programu se chová jako strong. Pro více vláken je situace složitější, např. se může stát, že zápis do proměnných A a B jedním threadem je v druhém threadu viděn v opačném pořadí nebo dokonce vůbec. Proto jsou k dispozici nástroje pro zajištění konzistence (bariéry, kritické sekce – synchronized bloky, atomické instrukce ).

2.Nástroje Javy

i)Synchronizace

Každý objekt v Javě má se sebou svázaný monitor. Ten má tři funkce:

  1. Zámek – zajišťuje exkluzivitu přístupu.

  2. Wait condition – jde o čekání wait() (výstup z wait je opět read-barrier) a notifikace threadů notify() a notifyAll().

  3. Memory barrier. – při použití synchronized bloku se před něj do bytekódu vkládá instrukce monitorenter (chová se jako read-barrier, což znamená, že po této instrukci veškerá čtení z paměti musí číst aktuální hodnoty) a za něj monitorexit (chová se jako write-barrier, což znamená, že veškeré v kritické sekci do paměti zapsané hodnoty budou od teď viditelné ostatním threadům).

Synchronizace obvykle stačí pro většinu použití.

ii)Volatile a final proměnné

Volatile proměnné mají několik vlastnosti:

  1. Kompilátor je nesmí držet v registru, musí je vždy propsat do hlavní paměti.

  2. Čtení volatile fieldu implikuje provedení read-barrier před tímto čtením, zápis do volatile fieldu implikuje provedení write-barrier po tomto zápisu.

  3. Kompilátor sice může přerovnávat přístupy k paměti, nicméně toto nesmí provést přes hranici přístupu k volatile fieldu (to je novinka od Javy 5).

  4. Čtení a zápis volatile fieldu je vždy atomický (i pro long a double).

  5. Tvoří happens before hrany. Pokud zapíšu do volatile fieldu v jednom threadu a z druhého threadu přečtu již tuto novou hodnotu, pak vše co se stalo v prvním threadu před tímto zápisem se určitě stalo před čtením z druhého threadu a je tedy z tohoto druhého threadu po tomto čtení viditelné.

Final proměnné

  1. Lze je číst bez synchronizace z jakéhokoliv threadu a je zaručen zisk správné hodnoty (před tím ovšem musí doběhnout konstruktor objektu s daným final fieldem, aniž by dal někomu referenci sám na sebe, tzn. reference na this nesmí opustit konstruktor. Toho nejsnáze dosáhneme vytvořením Factory a jednoduchým konstruktorem).

  2. Immutable objekty mají mnoho dalších výhod. Mohu s nimi pracovat z libovolného threadu bez synchronizace, lze je znovu použít.

iii)Double-checking pattern

Používá se při lazy inicializaci singletonu, v Javě 4 je vždy rozbitý, za nepříznivých okolností mohou vzniknout dvě instance, a nebo hůře, vrácená instance může být ne zcela inicializovaná.

01 Class A {
02 static A inst;
03 public static A getInst() {
04 if (inst == null) {
05 synchronized (A.class) {
06 if (inst == null) {
07 inst = new A();
08 }
09 }
10 }
11 return inst;
12 }
13 }

První myšlený průběh: první vlákno na řádku 4 zjistí, že instance neexistuje, vstoupí tedy do kritické sekce (druhé vlákno čeká) a na řádku 6 znovu zkontroluje, zda mezitím někdo instanci nevyrobil, to se nestalo, vytvoří tedy instanci a opustí kritickou sekci. Druhé vlákno vstoupí do kritické sekce, zjistí že instance už existuje, takže vystoupí ven a vrátí instanci. To se zdá býti v pořádku.

Druhý myšlený průběh: první vlákno se dostane až na řádek 7 a začne provádět výraz „new A()“. Kompilátor ale provede optimalizaci, zinlinuje konstruktor, přerovná fieldy, pak nejprve alokuje paměť pro objekt a referenci na něj propíše do paměti (to může udělat, určitě to musí udělat při opuštění kritické sekce, ale nikdo mu nezakazuje udělat to dříve) a teprve pak začne provádět konstruktor, který inicializuje další fieldy v singletonu. V tomto okamžiku se přepne kontext na druhé vlákno a to na řádku 4 zjistí, že instance již existuje a vrátí ji, ta ale nebude zcela zkonstruovaná!!!

V Javě 5 je možno kód snadno opravit. Stačí field inst udělat volatile. Tím je zakázáno přerovnávání a bude to fungovat. Lepším způsobem je ale použít statický inicializátor. U něj je zaručeno že se volá právě jednou a většinou až když o třídu někdo „zavadí“, JVM inteligentně odkládá inicializaci.

01 Class A {
02 static A inst;
03 static {
04 inst = new A();
05 }
06 public static getA() {
07 return inst;
08 }
09 }

3.Java.util.concurrent

i)Atomic

Balík obsahuje 9 tříd {int, long, ref} x {value, array, field}, tj. AtomicInteger, AtomicIntegerArray, AtomicIntegerFieldUpdater a to samé pro typ long a referenci. Dále 3 třídy AtomicBoolean, AtomicMarkableReference a AtomicStampedReference.

  1. AtomicXXX třídy podporují atomické operací jako např. CompareAndSet. Příkladem využití může být kód generující po sobě jdoucích id. Naivní implementace pomocí obalení Longu a generování id v synchronized bloku je správná, ale není škálovatelná, neboť kritická sekce se v případě mnoha vláken žádajících id stane úzkým hrdlem celého programu. Implementace pomocí AtomicLong:

    01 AtomicLong v;
    02 long nextValue() {
    03         for (;;) {
    04                 long x = v.get();
    05                 long n = x + 1;
    06                 if (v.compareAndSet(x,n)) returnn;
    07         }
    08 }

    Na řádku 4 získám poslední id, na řádku 5 si spočítám, jak by mělo vypadat následující id. Volání na řádku 6 může dopadnout úspěšně (compareAndSet vrací true), což znamená že v okamžiku volání atomické instrukce mi žádný jiný thread id nezvýšil, hodnota v je zvýšena a vracím ji jako nové id. Pokud ale volání dopadne neúspěšně, znamená to, že mezi řádkem 4 a 6 jiný thread již id zvýšil, instrukce compareAndSet hodnotu v nezměnila a opakuji pokus o získání id.
    Tato implementace je mnohem efektivnější. Proto existuje přímo metoda getAndIncrement(), která dělá něco velmi podobného.

  2. AtomicXXXArray zajišťuje pole, jehož položky (nikoliv celé pole) lze atomicky modifikovat. Plní stejnou funkci, jakou by plnilo pole AtomicXXX objektů, za nižší spotřeby paměti (AtomicXXXArray jsou dva objekty – pole a obal, zatímco pole AtomicXXX objektů by bylo mnoho, dle počtu položek pole).

  3. AtomicXXXFieldUpdater umožňuje atomicky modifikovat member fieldy třídy (musí být volatile a musí se modifikovat vždy přes tento updater, nikdy přímo) bez overheadu, který by vznikl, kdybych všechny tyto fieldy deklaroval jako AtomicXXX. Je to tedy podobné jako v případě AtomicXXXArray, ale jde o fieldy.

  4. Protože nejsem schopen atomicky modifikovat dva fieldy najednou (v jedné atomické operaci), což je pro některé algoritmy důležité, existují AtomicMarkableReference (pár reference a boolean) a AtomicStampedReference (pár reference a integer), které toto umožňují. Zatím to v JVM není příliš efektivně implementováno (jde o obalení vnitřní immutable třídy, která se nahrazuje jako celek), v budoucnu se to ale může změnit.

ii)Locks

Interface Lock nabízí mocnější sémantiku pro zámky než klasické synchronized bloky. Rozdíly:

  1. Nový zámek je objekt s metodami acquire() a release(), takže ho nejsme nuceni získat a uvolnit v rámci jedné metody. Je samozřejmě třeba dát pozor na korektní uvolnění při výjimkách apod.

  2. Podpora tzv. hand over hand algoritmů. Například při práci s double linked list mohu zamknout první a druhou položku, pak druhý odemknout a třetí zamknout atd. Toto opět není s klasickým synchronized blokem možné.

  3. Je možno čekat na více různých podmínek vytvořením objektů Condition (klasický synchronized blok má jedinou podmínku, na kterou čekají všechny thready).

  4. Thread čekající na zámek při použití klasického synchronized bloku nelze nijak probudit, ani mu říct aby to vzdal. Nový zámek má metody tryAcquire(), kterou lze specifikovat timeout, a také přerušitelnost pomocí Thread.interrupt(), kdy toto volání čekající thread probudí a vyhodí InterruptedException.

  5. Možnost udělat „spravedlivé“ zámky. Čekajícím threadům může být zámek přidělován v pořadí, v němž o něj požádali.

  6. Klasické synchronized bloky jsou reentrantní zámky. To znamená, že thread, který vlastní zámek může do bloku vstupovat znovu. Nové zámky mohou být reentrantní nebo nereentrantní. V případě nereentrantního zámku thread deadlockne sám sebe, pokud se pokusí opakovaně vstoupit do chráněného bloku.

iii)Synchronizers

Implementace známých synchronizačních primitiv:

  1. Třída Semaphore reprezentuje klasický Dijkstrův semafor s parametrem (počtem slotů), kolik threadů může být v jednu chvíli v kritické sekci. Jakmile thread uvolní slot, může ho získat někdo jiný. Počtem slotů jedna degeneruje semafor na nereentrantní mutex.

  2. Třída CountDownLatch je podobná semaforu, ale uvolněný slot již nikdo nezíská, thready mohou pomocí await() čekat, než se vyčerpají všechny sloty (tj. než počítadlo klesne na nulu). Potom již zůstává zámek odemčen a další volání await() vždy projde bez čekání.

  3. Třída CyclicBarrier je jakési rendez vous n threadů. Každý thread může požádat o bariéru a tím je zablokován, než se všechny thready „sejdou“ na této bariéře. Jakmile se tak stane, máme volitelně k dispozici Runnable pro provedení nějaké akce a až doběhne, jsou všechny thready odblokovány a mohou pokračovat.

  4. Třída Exchanger umožňuje dvěma threadům si v určitém okamžiku vyměnit data (systém se postará o příslušná propsání do paměti atd.) a pokračovat.

iv)Concurrent collections

Kolekce ConcurrentHashMap – od synchronizované se liší:

  1. Nejen že je thread-safe, ale je i zároveň neblokující. Umožňuje konkurenční čtení i zápisy a není tedy úzkým hrdlem v případě paralelního přístupu mnoha threadů.

  2. Lze specifikovat concurrency level, což je stupeň vnitřní segmentace. Vyšší stupeň znamená jemnější dělení, tím vyšší paralelismus operací (i zápisů), ale i vyšší overhead.

  3. Iterátory nejsou statické (nejsou snapshot) a nikdy nevyhazují výjimku ConcurrentModificationException. Vždy reprezentují konzistentní (nějakým způsobem paralelním výpočtem dosažitelný) pohled na položky v kolekci, takže nikdy neukazuje kolekci ve stavu, v kterém v žádném čase nebyla (nemohla být). Například provádí-li jeden thread postupně +A, +B, +C, -B, +D a druhý thread tuto kolekci iteruje, pak nikdy neuvidí A, B, C, D. Ale může např. vidět jen A. Nebo A, B. Nebo A, B, C. Nebo A, C, D.

Kolekce CopyOnWriteArrayList při modifikaci zkopíruje vše do nového pole a staré visí na heapu, než je odstraněno garbage collectorem. Vhodné pro pole, které se často čte a zřídka modifikuje. Iterátor je snapshot stavu kolekce v okamžiku jeho získání, takže také nikdy nevyhazuje výše popsanou výjimku.

Fronta ConcurrentLinkedQueue může mít více producentů i konzumentů. Je opět nejen thread-safe, ale i paralelní.

Fronty XXXBlockingQueue umožňují čekat na položku (i např. s timeoutem). Je několik implementací – LinkedBlockingQueue (neomezená délka), PriorityBlockingQueue (přerovnává položky dle priority), ArrayBlockingQueue (pevná délka, o to efektivnější), SynchronousQueue (fronta nulové délky, nikdy neobsahuje žádnou položku, k výměně jedné položky se musí sejít producent i konzument, přijde-li dříve producent, čeká na konzumenta a naopak), DelayedQueue (položky přerovnává podle jejich timeoutu, ven je pustí až timeout doběhne, metoda size() sice může vrátit nějaké číslo, jakožto počet položek, ale neznamená to že už vytimeoutovali a jsou tedy k dispozici).

v)Executors

Executor je služba, která vykoná předaný úkol (Runnable). Typicky v jiném threadu, např. v AWT threadu nebo v thread poolu dané velikosti (i s frontou, může být omezené délky a při naplnění volat definovaný callback, jenž může např. zahodit nejnovější/ nejstarší požadavek nebo vykonat požadavek v threadu, kterým byl zaslán a tím ho „zbrzdit“).

ExecutorService má ještě mocnější sémantiku než Executor. Umí např. přestat přijímat úkoly apod.

ScheduledExecuterService vykonává odložené a periodické úkoly.

Callable je jako Runnable, ale po předání do ExecutorService vrací jako výsledek Future, pomocí nějž mohu úkol zrušit nebo si počkat na výsledek (blokujíce i neblokujíce).

Do ExecutorCompletionService předáváme Callables a získáváme Futures, ale tak, že až mají výsledek, tak se tyto ukládají do fronty, odkud je můžeme získávat.

2 Responses to “Paralelní programování v Javě a java.util.concurrent”

  1. radek says:

    dobry den,
    mohl byste prosim zmenit odkaz z
    http://avc.sh.cvut.cz/archiv/index.php?id=1041&rid=436&offset=0&select=NetBeans

    na odkaz z novych stranek http://avc-cvut.cz ?

    bohuzel jsme kvuli politickemu natlaku museli klub sh opustit a zalozili jsme obc. sdruzeni AVC studentu CVUT a na tvorbe videi pokracujeme tam…

    S pozdravem

    Radek Novotny
    Audiovizualni centrum studentu CVUT
    radek.novotny@avc-cvut.cz

  2. ludek says:

    Link opraven