Uvod u Thread Pools u Javi

1. Uvod

Ovaj je članak pogled na spremišta niti u Javi - počevši od različitih implementacija u standardnoj knjižnici Java, a zatim na Googleovu knjižnicu Guava.

2. Bazen niti

U Javi se niti mapiraju u niti na razini sustava koje su resursi operativnog sustava. Ako nekontrolirano stvarate niti, možda ćete brzo ostati bez tih resursa.

Prebacivanje konteksta između niti vrši i operativni sustav - kako bi oponašao paralelizam. Pojednostavljeni pogled je da - što više niti mrijestite, to manje niti provodi obavljajući stvarni posao.

Obrazac Thread Pool pomaže uštedjeti resurse u višenitnoj aplikaciji, a također i sadržavati paralelizam u određenim unaprijed definiranim ograničenjima.

Kada upotrebljavate spremište niti, vi napišite svoj istodobni kôd u obliku paralelnih zadataka i pošaljite ih na izvršenje instanci spremišta niti. Ova instanca kontrolira nekoliko ponovno korištenih niti za izvršavanje ovih zadataka.

Uzorak vam omogućuje kontrolirati broj niti koje aplikacija stvara, njihov životni ciklus, kao i za planiranje izvršenja zadataka i zadržavanje dolaznih zadataka u redu čekanja.

3. Povoji niti u Javi

3.1. Izvršitelji, Izvršitelj i ExecutorService

The Izvršitelji klasa pomoćnika sadrži nekoliko metoda za stvaranje unaprijed konfiguriranih instanci spremišta niti za vas. Te su klase dobro mjesto za početak - upotrijebite ih ako ne trebate primijeniti prilagođeno fino podešavanje.

The Izvršitelj i ExecutorService sučelja se koriste za rad s različitim implementacijama spremišta niti u Javi. Obično biste trebali neka vaš kod bude odvojen od stvarne implementacije spremišta niti i koristite ova sučelja u cijeloj aplikaciji.

The Izvršitelj sučelje ima jedan izvršiti način podnošenja Izvodljivo instance za izvršenje.

Evo kratkog primjera kako možete koristiti Izvršitelji API za stjecanje Izvršitelj instanci potpomognutom spremištem od jedne niti i neograničenim redom za sekvencijalno izvršavanje zadataka. Ovdje izvršavamo jedan zadatak koji jednostavno ispisuje “Pozdrav svijete" na ekranu. Zadatak se predaje kao lambda (značajka Java 8) za koju se pretpostavlja da je Izvodljivo.

Izvršitelj izvršitelj = Izvršitelji.newSingleThreadExecutor (); executor.execute (() -> System.out.println ("Hello World"));

The ExecutorService sučelje sadrži velik broj metoda za kontrolu napretka zadataka i upravljanje prestankom usluge. Korištenjem ovog sučelja možete predati zadatke na izvršenje i također kontrolirati njihovo izvršavanje pomoću vraćenog Budućnost primjer.

U slijedećem primjeru, mi stvaramo ExecutorService, predajte zadatak, a zatim upotrijebite vraćeno Budućnost‘S dobiti metoda da pričekate dok se predani zadatak ne završi i ne vrati vrijednost:

ExecutorService executorService = Izvršitelji.newFixedThreadPool (10); Buduća budućnost = executorService.submit (() -> "Hello World"); // neke operacije Rezultat niza = future.get ();

Naravno, u scenariju iz stvarnog života obično ne želite nazvati future.get () odmah, ali odgodite poziv dok vam zapravo ne zatreba vrijednost izračuna.

The podnijeti metoda je preopterećena da bi uzela bilo Izvodljivo ili Pozivno oba su funkcionalna sučelja i mogu se prenositi kao lambda (počevši od Jave 8).

IzvodljivoJedna metoda ne donosi iznimku i ne vraća vrijednost. The Pozivno sučelje može biti prikladnije, jer nam omogućuje izbacivanje iznimke i vraćanje vrijednosti.

Napokon - dopustiti kompajleru da zaključi Pozivno tipa, jednostavno vratite vrijednost iz lambda.

Za više primjera korištenja ExecutorService sučelje i futures, pogledajte „Vodič za Java ExecutorService“.

3.2. ThreadPoolExeecuter

The ThreadPoolExeecuter je proširiva implementacija spremišta niti s puno parametara i kuka za fino podešavanje.

Glavni konfiguracijski parametri o kojima ćemo ovdje razgovarati su: corePoolSize, maximumPoolSize, i keepAliveTime.

Skup se sastoji od fiksnog broja jezgrenih niti koje se stalno drže unutra i nekih prekomjernih niti koje se mogu stvoriti i prekinuti kad više nisu potrebne. The corePoolSize parametar je broj niti jezgre koje će biti instancirane i zadržane u spremištu. Kada uđe novi zadatak, ako su sve niti jezgre zauzete i unutarnji je red pun, tada se spremište može rasti do maximumPoolSize.

The keepAliveTime parametar je vremenski interval za koji prekomjerni niti (instancirani više od corePoolSize) smiju postojati u stanju mirovanja. Prema zadanim postavkama ThreadPoolExeecuter samo uzima u obzir ne-jezgrene niti za uklanjanje. Da bismo primijenili istu politiku uklanjanja na jezgrene niti, možemo koristiti allowCoreThreadTimeOut (true) metoda.

Ovi parametri pokrivaju širok spektar slučajeva upotrebe, ali najtipičnije konfiguracije su unaprijed definirane u Izvršitelji statičke metode.

Na primjer, newFixedThreadPool metoda stvara a ThreadPoolExeecuter s jednakim corePoolSize i maximumPoolSize vrijednosti parametara i nula keepAliveTime. To znači da je broj niti u ovom spremištu niti uvijek isti:

Izvršitelj ThreadPoolExecutor = (ThreadPoolExecutor) Izvršitelji.newFixedThreadPool (2); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); assertEquals (2, izvršitelj.getPoolSize ()); assertEquals (1, izvršitelj.getQueue (). size ());

U gornjem primjeru instanciramo a ThreadPoolExeecuter s fiksnim brojem niti 2. To znači da ako je broj istodobno izvršenih zadataka u svakom trenutku manji ili jednak dvama, tada se izvršavaju odmah. Inače, neki od ovih zadataka mogu se staviti u red čekanja da dođu na red.

Stvorili smo tri Pozivno zadatke koji oponašaju težak posao spavanjem 1000 milisekundi. Prva dva zadatka izvršit će se odjednom, a treći će morati pričekati u redu. To možemo provjeriti pozivom na getPoolSize () i getQueue (). size () metode neposredno nakon predaje zadataka.

Još jedan unaprijed konfiguriran ThreadPoolExeecuter može se stvoriti pomoću Izvršitelji.newCachedThreadPool () metoda. Ova metoda uopće ne prima određeni broj niti. The corePoolSize je zapravo postavljeno na 0, a maximumPoolSize postavljeno je na Cijeli broj.MAX_VALUE za ovaj slučaj. The keepAliveTime je 60 sekundi za ovaj.

Te vrijednosti parametara to znače spremište spremišta niti može rasti bez ograničenja kako bi se prilagodilo bilo kojem broju poslanih zadataka. Ali kada niti više ne budu potrebne, uklonit će se nakon 60 sekundi neaktivnosti. Tipičan slučaj upotrebe je kada u svojoj aplikaciji imate puno kratkotrajnih zadataka.

Izvršitelj ThreadPoolExecutor = (ThreadPoolExecutor) Izvršitelji.newCachedThreadPool (); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); assertEquals (3, izvršitelj.getPoolSize ()); assertEquals (0, executor.getQueue (). size ());

Veličina reda u gornjem primjeru uvijek će biti nula jer je interno a SynchronousQueue primjenjuje se instanca. U SynchronousQueue, parova umetnuti i ukloniti operacije se uvijek odvijaju istodobno, tako da red nikada zapravo ne sadrži ništa.

The Izvršitelji.newSingleThreadExecutor () API stvara još jedan tipičan oblik ThreadPoolExeecuter koji sadrži jednu nit. Izvršitelj s jednom niti idealan je za stvaranje petlje događaja. The corePoolSize i maximumPoolSize parametri su jednaki 1, a keepAliveTime je nula.

Zadaci u gornjem primjeru izvršavat će se sekvencijalno, pa će vrijednost zastavice biti 2 nakon završetka zadatka:

AtomicInteger brojač = novi AtomicInteger (); ExecutorService izvršitelj = Executors.newSingleThreadExecutor (); izvršilac.submit (() -> {counter.set (1);}); izvršilac.submit (() -> {counter.compareAndSet (1, 2);});

Uz to, ovo ThreadPoolExeecuter je ukrašen nepromjenjivim omotom, pa se nakon stvaranja ne može ponovno konfigurirati. Imajte na umu da je to također razlog što ga ne možemo prebaciti na ThreadPoolExeecuter.

3.3. ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor proširuje ThreadPoolExeecuter klase i također implementira ScheduledExecutorService sučelje s nekoliko dodatnih metoda:

  • raspored metoda omogućuje izvršavanje zadatka jednom nakon određenog kašnjenja;
  • scheduleAtFixedRate metoda omogućuje izvršavanje zadatka nakon određenog početnog kašnjenja, a zatim izvršavanje uzastopno s određenim vremenskim razdobljem; the razdoblje argument je vrijeme mjereno između vremena početka zadataka, tako da je stopa izvršenja fiksna;
  • rasporedWithFixedDelay metoda je slična scheduleAtFixedRate u tome što opetovano izvršava zadani zadatak, ali navedeno kašnjenje je mjereno između kraja prethodnog zadatka i početka sljedećeg; stopa izvršenja može varirati ovisno o vremenu potrebnom za izvršavanje bilo kojeg zadatka.

The Izvršitelji.newScheduledThreadPool () metoda se obično koristi za stvaranje a ScheduledThreadPoolExecutor s danim corePoolSize, bez ograničenja maximumPoolSize i nula keepAliveTime. Evo kako rasporediti zadatak za izvršenje u 500 milisekundi:

ScheduledExecutorService izvršitelj = Izvršitelji.newScheduledThreadPool (5); executor.schedule (() -> {System.out.println ("Hello World");}, 500, TimeUnit.MILLISECONDS);

Sljedeći kod pokazuje kako izvršiti zadatak nakon kašnjenja od 500 milisekundi, a zatim ga ponoviti svakih 100 milisekundi. Nakon raspoređivanja zadatka, čekamo dok se ne aktivira tri puta pomoću CountDownLatch zaključati, zatim ga otkažite pomoću Future.cancel () metoda.

Zaključavanje CountDownLatch = novo CountDownLatch (3); ScheduledExecutorService izvršitelj = Izvršitelji.newScheduledThreadPool (5); ScheduledFuture future = executor.scheduleAtFixedRate (() -> {System.out.println ("Hello World"); lock.countDown ();}, 500, 100, TimeUnit.MILLISECONDS); lock.await (1000, TimeUnit.MILLISECONDS); future.cancel (true);

3.4. ForkJoinPool

ForkJoinPool je središnji dio račvati / spojiti framework uveden u Javi 7. Riješava uobičajeni problem mrijest više zadataka u rekurzivnim algoritmima. Koristeći jednostavan ThreadPoolExeecuter, brzo ćete ostati bez niti, jer svaki zadatak ili podzadatak zahtijeva vlastitu nit za pokretanje.

U račvati / spojiti bilo koji zadatak može se pojaviti (vilica) nekoliko podzadataka i pričekajte njihovo dovršavanje pomoću pridružiti metoda. Prednost račvati / spojiti okvir je taj ne stvara novu nit za svaki zadatak ili podzadatak, umjesto toga implementirajući algoritam Work Stealing. Ovaj je okvir detaljno opisan u članku "Vodič za Fork / Join Framework u Javi"

Pogledajmo jednostavan primjer korištenja ForkJoinPool za prelazak stabla čvorova i izračunavanje zbroja svih vrijednosti listova. Evo jednostavne implementacije stabla koje se sastoji od čvora, int vrijednost i skup podređenih čvorova:

statička klasa TreeNode {int vrijednost; Postavite djecu; TreeNode (int vrijednost, TreeNode ... djeca) {this.value = value; this.children = Sets.newHashSet (djeca); }}

Sada ako želimo paralelno zbrojiti sve vrijednosti u stablu, moramo implementirati a Rekurzivna zadaća sučelje. Svaki zadatak prima svoj vlastiti čvor i dodaje svoju vrijednost zbroju vrijednosti svojih djeco. Za izračun zbroja djeco vrijednosti, implementacija zadatka čini sljedeće:

  • struje djeco postaviti,
  • mape preko ovog potoka, stvarajući novi Zadatak brojanja za svaki element,
  • izvršava svaki podzadatak račvanjem,
  • prikuplja rezultate pozivom na pridružiti metoda na svakom račvastom zadatku,
  • zbraja rezultate koristeći Kolekcionari.summingInt kolektor.
javna statička klasa CountingTask proširuje RecursiveTask {privatni konačni čvor TreeNode; javni CountingTask (čvor TreeNode) {this.node = node; } @Override protected Integer compute () {return node.value + node.children.stream () .map (childNode -> new CountingTask (childNode) .fork ()) .collect (Collectors.summingInt (ForkJoinTask :: join)) ; }}

Kôd za pokretanje izračuna na stvarnom stablu vrlo je jednostavan:

TreeNode tree = new TreeNode (5, new TreeNode (3), new TreeNode (2, new TreeNode (2), new TreeNode (8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool (); int sum = forkJoinPool.invoke (novi CountingTask (stablo));

4. Implementacija Thread Pool-a u Guavi

Guava je popularna Googleova biblioteka komunalnih usluga. Ima mnogo korisnih klasa istodobnosti, uključujući nekoliko praktičnih implementacija ExecutorService. Provedbene klase nisu dostupne za izravnu instanciju ili potklasiranje, pa je jedina ulazna točka za stvaranje njihovih primjera VišeIzvršitelji klasa pomoćnika.

4.1. Dodavanje Guave kao ovisnosti Mavena

Dodajte sljedeću ovisnost u svoju Maven pom datoteku kako biste u svoj projekt uključili biblioteku Guava. Najnoviju verziju biblioteke Guava možete pronaći u spremištu Maven Central:

 com.google.guava guava 19.0 

4.2. Izravni izvršitelj i služba izravnog izvršitelja

Ponekad želite izvršiti zadatak ili u trenutnoj niti ili u spremištu niti, ovisno o nekim uvjetima. Radije biste koristili jedan Izvršitelj sučelje i samo prebacite implementaciju. Iako nije tako teško smisliti implementaciju Izvršitelj ili ExecutorService koji izvršava zadatke u trenutnoj niti, i dalje zahtijeva pisanje nekog uzorka koda.

Drago nam je što Guava nudi unaprijed definirane primjerke.

Evo primjera koji pokazuje izvršavanje zadatka u istoj niti. Iako pruženi zadatak spava 500 milisekundi, to je blokira trenutnu nit, a rezultat je dostupan odmah nakon izvršiti poziv je završen:

Izvršitelj izvršitelj = MoreExecutors.directExecutor (); Izvršeno AtomicBoolean = novo AtomicBoolean (); executor.execute (() -> {try {Thread.sleep (500);} catch (InterruptedException e) {e.printStackTrace ();} execute.set (true);}); assertTrue (izvršeno.get ());

Instance vratio directExecutor () metoda zapravo je statički singleton, pa upotreba ove metode uopće ne pruža režijske troškove za stvaranje objekta.

Trebali biste preferirati ovu metodu nego MoreExecutors.newDirectExecutorService () jer taj API stvara punopravnu implementaciju izvršne usluge na svakom pozivu.

4.3. Izlaz iz izvršnih službi

Još jedan uobičajeni problem je isključivanje virtualnog stroja dok spremište niti još uvijek izvodi svoje zadatke. Čak i s uspostavljenim mehanizmom otkazivanja, ne postoji jamstvo da će se zadaci ponašati lijepo i zaustaviti svoj rad kad se izvršna služba isključi. To može uzrokovati da JVM visi unedogled dok zadaci nastavljaju raditi svoj posao.

Da bi riješio taj problem, Guava uvodi obitelj izvršnih izvršnih službi. Oni se temelje na daemon niti koje se završavaju zajedno s JVM-om.

Te usluge također dodaju kuku za isključivanje pomoću Runtime.getRuntime (). AddShutdownHook () metodu i spriječite da se VM završi konfigurirano vrijeme prije odustajanja od obješenih zadataka.

U sljedećem primjeru predajemo zadatak koji sadrži beskonačnu petlju, ali koristimo izlaznu izvršnu uslugu s konfiguriranim vremenom od 100 milisekundi da bismo čekali zadatke po završetku VM-a. Bez toga exitingExecutorService na mjestu, ovaj bi zadatak uzrokovao da VM visi na neodređeno vrijeme:

Izvršitelj ThreadPoolExecutor = (ThreadPoolExecutor) Izvršitelji.newFixedThreadPool (5); ExecutorService executorService = MoreExecutors.getExitingExecutorService (izvršitelj, 100, TimeUnit.MILLISECONDS); executorService.submit (() -> {while (true) {}});

4.4. Dekorateri koji slušaju

Dekoratori koji slušaju omogućuju vam omotanje ExecutorService i primiti Slušljivo u budućnosti slučajevi nakon predaje zadatka umjesto jednostavnih Budućnost instance. The Slušljivo u budućnosti sučelje se proširuje Budućnost i ima jednu dodatnu metodu addListener. Ova metoda omogućuje dodavanje slušatelja koji se poziva nakon budućeg završetka.

Rijetko ćete poželjeti koristiti ListenableFuture.addListener () metoda izravno, ali je bitan za većinu pomoćnih metoda u Budućnosti klasa korisnosti. Na primjer, s Futures.allAsList () metodu možete kombinirati nekoliko Slušljivo u budućnosti instance u jednom Slušljivo u budućnosti koji završava nakon uspješnog završetka svih budućih kombinacija:

ExecutorService executorService = Izvršitelji.newCachedThreadPool (); ListeningExecutorService ListenExecutorService = MoreExecutors.listeningDecorator (executorService); ListenableFuture future1 = ListenExecutorService.submit (() -> "Zdravo"); Slušljiva buduća budućnost2 = ListenExecutorService.submit (() -> "Svijet"); Pozdrav niza = Futures.allAsList (future1, future2) .get () .stream () .collect (Collectors.joining ("")); assertEquals ("Pozdrav svijetu", pozdrav);

5. Zaključak

U ovom smo članku raspravljali o obrascu Thread Pool i njegovim implementacijama u standardnoj biblioteci Java i Googleovoj knjižnici Guava.

Izvorni kôd članka dostupan je na GitHubu.