Vodič za Java SynchronousQueue

1. Pregled

U ovom ćemo članku pogledati SynchronousQueue od java.util.concurrent paket.

Jednostavno rečeno, ova nam implementacija omogućuje razmjenu podataka između niti na siguran način.

2. Pregled API-ja

The SynchronousQueue samo ima dvije podržane operacije: uzeti() i staviti(), i obojica blokiraju.

Na primjer, kada želimo dodati element u red, trebamo pozvati staviti() metoda. Ta će se metoda blokirati dok neka druga nit ne pozove uzeti() metoda, signalizirajući da je spreman uzeti element.

iako SynchronousQueue ima sučelje reda, trebali bismo o tome razmišljati kao o razmjeni za jedan element između dvije niti, u kojem jedna nit predaje element, a druga nit uzima taj element.

3. Implementacija primopredaje pomoću zajedničke varijable

Da vidim zašto SynchronousQueue može biti toliko korisno, implementirat ćemo logiku koristeći zajedničku varijablu između dvije niti, a zatim ćemo tu logiku prepisati pomoću SynchronousQueue čineći naš kod puno jednostavnijim i čitljivijim.

Recimo da imamo dvije niti - proizvođača i potrošača - i kada proizvođač postavlja vrijednost zajedničke varijable, želimo tu činjenicu signalizirati niti potrošača. Dalje, potrošačka nit će dohvatiti vrijednost iz dijeljene varijable.

Koristit ćemo CountDownLatch za koordinaciju te dvije niti, kako bi se spriječila situacija kada potrošač pristupa vrijednosti dijeljene varijable koja još nije postavljena.

Mi ćemo definirati a sharedState varijabla i a CountDownLatch koji će se koristiti za koordinaciju obrade:

ExecutorService izvršitelj = Executors.newFixedThreadPool (2); AtomicInteger sharedState = novi AtomicInteger (); CountDownLatch countDownLatch = novo CountDownLatch (1);

Proizvođač će spremiti slučajni cijeli broj na sharedState varijablu i izvršite countDown () metoda na countDownLatch, signalizirajući potrošaču da može dohvatiti vrijednost iz sharedState:

Izvođač koji se može pokrenuti = () -> {Cjeloviti proizvedeniElement = ThreadLocalRandom .current () .nextInt (); sharedState.set (produceElement); countDownLatch.countDown (); };

Potrošač će pričekati na countDownLatch koristiti čekati() metoda. Kad proizvođač signalizira da je varijabla postavljena, potrošač će je dohvatiti iz sharedState:

Potrošač koji se može pokrenuti = () -> {try {countDownLatch.await (); Cijeli broj ConsumedElement = sharedState.get (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Posljednje, ali ne najmanje važno, započnimo naš program:

izvršitelj.izvršiti (producent); izvršitelj.izvršiti (potrošač); izvršitelj.awaitTermination (500, TimeUnit.MILLISECONDS); egzekutor.šutdown (); assertEquals (countDownLatch.getCount (), 0);

Proizvest će sljedeći rezultat:

Spremanje elementa: -1507375353 na mjenjačnicu potrošilo je element: -1507375353 s mjenjačnice

Vidimo da je ovo puno koda za implementaciju tako jednostavne funkcionalnosti kao što je razmjena elementa između dvije niti. U sljedećem ćemo odjeljku pokušati to poboljšati.

4. Provedba primopredaje pomoću SynchronousQueue

Primijenimo sada istu funkcionalnost kao u prethodnom odjeljku, ali s a SynchronousQueue. Ima dvostruki učinak jer ga možemo koristiti za razmjenu stanja između niti i za koordinaciju te radnje tako da ne trebamo koristiti ništa osim SynchronousQueue.

Prvo ćemo definirati red:

ExecutorService izvršitelj = Executors.newFixedThreadPool (2); Red SynchronousQueue = novi SynchronousQueue ();

Producent će nazvati a staviti() metoda koja će blokirati sve dok neka druga nit ne preuzme element iz reda:

Izvođač koji se može pokrenuti = () -> {Cjeloviti proizvedeniElement = ThreadLocalRandom .current () .nextInt (); probajte {queue.put (produceElement); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Potrošač će jednostavno dohvatiti taj element pomoću uzeti() metoda:

Izvodljivi potrošač = () -> {try {Integer consumedElement = queue.take (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Dalje ćemo započeti naš program:

izvršitelj.izvršiti (producent); izvršitelj.izvršiti (potrošač); izvršitelj.awaitTermination (500, TimeUnit.MILLISECONDS); egzekutor.šutdown (); assertEquals (queue.size (), 0);

Proizvest će sljedeći rezultat:

Spremanje elementa: 339626897 na mjenjačnicu potrošilo je element: 339626897 s mjenjačnice

Možemo vidjeti da a SynchronousQueue koristi se kao točka razmjene između niti, što je puno bolje i razumljivije od prethodnog primjera koji je koristio zajedničko stanje zajedno s CountDownLatch.

5. Zaključak

U ovom smo brzom vodiču pogledali SynchronousQueue konstruirati. Stvorili smo program koji razmjenjuje podatke između dviju niti pomoću zajedničkog stanja, a zatim smo taj program prepisali kako bismo iskoristili SynchronousQueue konstruirati. To služi kao točka razmjene koja koordinira nit proizvođača i potrošača.

Provedbu svih ovih primjera i isječaka koda možete pronaći u projektu GitHub - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.


$config[zx-auto] not found$config[zx-overlay] not found