Vodič za biblioteku paralelnih kolektora Java

1. Uvod

Parallel-collectors je mala knjižnica koja pruža skup Java Stream API kolektora koji omogućuju paralelnu obradu - dok istodobno zaobilaze glavne nedostatke standardnih Parallel Streams.

2. Ovisnosti Mavena

Ako želimo početi koristiti knjižnicu, trebamo dodati jedan unos u Maven's pom.xml datoteka:

 com.pivovarit paralelni kolektori 1.1.0 

Ili jedan redak u graditeljskoj datoteci Gradle:

prevesti 'com.pivovarit: paralelni sakupljači: 1.1.0'

Najnoviju verziju možete pronaći na Maven Central.

3. Upozorenja paralelnih tokova

Parallel Streams bili su jedan od glavnih naglasaka Java 8, ali pokazalo se da su primjenjivi isključivo na tešku procesorsku obradu.

Razlog tome bila je činjenica da Paralelni tokovi interno su podržani zajedničkim dijelom JVM-a ForkJoinPool, koji je pružao ograničeni paralelizam i koristili su ga svi paralelni tokovi koji se izvode na jednoj JVM instanci.

Na primjer, zamislimo da imamo popis ID-ova i želimo ih koristiti za dohvat popisa korisnika i da je ova operacija skupa.

Za to bismo mogli upotrijebiti paralelne tokove:

Popis id = Arrays.asList (1, 2, 3); Navesti rezultate = ids.parallelStream () .map (i -> fetchById (i)) // svaka operacija traje jednu sekundu .collect (Collectors.toList ()); System.out.println (rezultati); // [korisnik-1, korisnik-2, korisnik-3]

I doista, možemo primijetiti da je primjetno ubrzanje. Ali postaje problematično ako započnemo izvoditi više paralelnih operacija blokiranja ... paralelno. To bi moglo brzo zasititi bazen i rezultiraju potencijalno velikim kašnjenjima. Zbog toga je važno izgraditi pregrade stvaranjem zasebnih spremišta niti - kako bi se spriječilo da nepovezani zadaci utječu na međusobno izvršavanje.

Kako bi se pružio običaj ForkJoinPool Na primjer, mogli bismo iskoristiti ovdje opisani trik, ali ovaj se pristup oslanjao na nedokumentirano hakiranje i bio je neispravan do JDK10. Više možemo pročitati u samom izdanju - [JDK8190974].

4. Paralelni sakupljači u akciji

Paralelni kolektori, kao što i samo ime govori, samo su standardni Stream API kolektori koji omogućuju paralelno izvođenje dodatnih operacija na prikupiti() faza.

ParallelCollectors (koje se zrcali Kolekcionari class) klasa je fasada koja pruža pristup čitavoj funkcionalnosti knjižnice.

Ako bismo željeli ponoviti gornji primjer, mogli bismo jednostavno napisati:

ExecutorService izvršitelj = Executors.newFixedThreadPool (10); Popis id = Arrays.asList (1, 2, 3); CompletableFuture rezultati = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), izvršitelj, 4)); System.out.println (results.join ()); // [korisnik-1, korisnik-2, korisnik-3]

Rezultat je isti, međutim, uspjeli smo pružiti svoj prilagođeni bazen niti, odrediti našu prilagođenu razinu paralelizma i rezultat je stigao umotan u CompletableFuture instance bez blokiranja trenutne niti.

S druge strane, standardni paralelni tokovi nisu mogli postići ništa od toga.

4.1. ParallelCollectors.parallelToList / ToSet ()

Koliko god intuitivno postajalo, ako želimo obraditi a Stream paralelno i prikupljati rezultate u a Popis ili Postavi, možemo jednostavno koristiti ParallelCollectors.parallelToList ili ParalelnoToSet:

Popis id = Arrays.asList (1, 2, 3); Navesti rezultate = ids.stream () .collect (parallelToList (i -> fetchById (i), izvršitelj, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

Ako želimo sakupljati Stream elementi u a Karta primjerice, baš kao i kod Stream API-ja, moramo pružiti dva mapera:

Popis id = Arrays.asList (1, 2, 3); Rezultati mape = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), izvršitelj, 4)) .join (); // {1 = korisnik-1, 2 = korisnik-2, 3 = korisnik-3}

Možemo pružiti i običaj Karta primjer Dobavljač:

Rezultati mape = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, izvršitelj, 4)) .join (); 

I prilagođena strategija rješavanja sukoba:

Popis id = Arrays.asList (1, 2, 3); Rezultati mape = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, izvršitelj, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

Slično gore navedenom, možemo prenijeti svoj običaj Dobavljač kolekcije ako želimo dobiti rezultate upakirane u naš prilagođeni spremnik:

Navedi rezultate = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, izvršitelj, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

Ako gore navedeno nije dovoljno, zapravo možemo dobiti a Stream instancu i tamo nastavite prilagođenu obradu:

Karta results = ids.stream () .collect (parallelToStream (i -> fetchById (i), izvršitelj, 4)). thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .pridružiti();

4.5. ParallelCollectors.parallel ()

Ovaj nam omogućuje strujanje rezultata po redoslijedu dovršetka:

ids.stream () .collect (paralelno (i -> fetchByIdWithRandomDelay (i), izvršitelj, 4)) .forEach (System.out :: println); // korisnik-1 // korisnik-3 // korisnik-2 

U ovom slučaju možemo očekivati ​​da će sakupljač svaki put vraćati različite rezultate otkako smo uveli slučajno kašnjenje obrade.

4.6. ParallelCollectors.parallelOrdered ()

Ova mogućnost omogućuje strujanje rezultata baš kao gore, ali zadržava izvorni redoslijed:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), izvršitelj, 4)) .forEach (System.out :: println); // korisnik-1 // korisnik-2 // korisnik-3 

U ovom slučaju, sakupljač će uvijek održavati redoslijed, ali može biti sporiji od gore navedenog.

5. Ograničenja

U trenutku pisanja, paralelni sakupljači ne rade s beskonačnim strujama čak i ako se koriste operacije kratkog spoja - to je ograničenje dizajna koje nameću unutarnji dijelovi Stream API-ja. Jednostavno rečeno, Streams kolektorima tretiraju kao operacije kratkog spoja, tako da tok mora obraditi sve uzvodne elemente prije nego što se zaustavi.

Drugo ograničenje je to operacije kratkog spoja ne prekidaju preostale zadatke nakon kratkog spoja.

6. Zaključak

Vidjeli smo kako nam biblioteka paralelnih kolektora omogućuje izvođenje paralelne obrade pomoću prilagođenog Java Stream API-ja Kolekcionari i CompletableFutures za korištenje prilagođenih spremišta niti, paralelizma i neblokirajućeg stila CompletableFutures.

Kao i uvijek, isječci koda dostupni su na GitHubu.

Za daljnje čitanje pogledajte biblioteku paralelnih kolektora na GitHubu, autorski blog i autorov Twitter račun.


$config[zx-auto] not found$config[zx-overlay] not found