Java 9 reaktivni tokovi

1. Pregled

U ovom ćemo članku razmotriti Java 9 reaktivni tok. Jednostavno rečeno, moći ćemo koristiti Teći klasa koja obuhvaća primarne građevne blokove za izgradnju logike obrade reaktivnog toka.

Reaktivni tokovi je standard za asinkronu obradu struje s neblokirajućim protutlakom. Ova je specifikacija definirana u Reaktivni manifest, i postoje razne njegove implementacije, na primjer, RxJava ili Akka-potoci.

2. Pregled reaktivnog API-ja

Za izgradnju a Teći, možemo koristiti tri glavne apstrakcije i složiti ih u asinkronu logiku obrade.

Svaki Teći treba obraditi događaje koje mu objavi instanca Publisher; the Izdavač ima jednu metodu - pretplatite se ().

Ako netko od pretplatnika želi primati događaje koje je objavio, mora se pretplatiti na dane Izdavač.

Primatelj poruka mora implementirati Pretplatnik sučelje. To je obično kraj za svakoga Teći obrada jer instanca ne šalje poruke dalje.

Možemo razmišljati o tome Pretplatnik kao Umivaonik. Ovo ima četiri metode koje treba nadjačati - onSubscribe (), onNext (), onError (), i onComplete (). To ćemo pogledati u sljedećem odjeljku.

Ako želimo transformirati dolaznu poruku i proslijediti je dalje sljedećoj Pretplatnik, moramo provesti Procesor sučelje. Ovo djeluje i kao a Pretplatnik jer prima poruke i kao Izdavač jer te poruke obrađuje i šalje na daljnju obradu.

3. Objavljivanje i potrošnja poruka

Recimo da želimo stvoriti jednostavan Teći, u kojem imamo a Izdavač objavljivanje poruka i jednostavan Pretplatnik troše poruke čim stignu - jednu po jednu.

Stvorimo Kraj pretplatnika razred. Moramo provesti Pretplatnik sučelje. Zatim ćemo poništiti tražene metode.

The onSubscribe () metoda se poziva prije početka obrade. Primjer Pretplata se predaje kao argument. To je klasa koja se koristi za kontrolu protoka poruka između Pretplatnik i Izdavač:

javna klasa EndSubscriber implementira Subscriber {pretplata na privatnu pretplatu; javni popis consumedElements = novi LinkedList (); @Override public void onSubscribe (pretplata na pretplatu) {this.subscription = pretplata; pretplata.zahtjev (1); }}

Također smo inicijalizirali prazno Popis od konzumiraniElementi koji će se koristiti u testovima.

Sada moramo implementirati preostale metode iz Pretplatnik sučelje. Ovdje je glavna metoda onNext () - poziva se kad god se Izdavač objavljuje novu poruku:

@Preuzmi javnu prazninu naNext (T stavka) {System.out.println ("Dobio:" + stavka); pretplata.zahtjev (1); }

Imajte na umu da kada smo započeli pretplatu na onSubscribe () metodom i kada smo obradili poruku trebamo nazvati zahtjev() metoda na Pretplata da signalizira da struja Pretplatnik je spreman potrošiti više poruka.

Na kraju, moramo provesti onError () - koji se poziva kad god se u obradu uključi neka iznimka, kao i onComplete () - nazvan kad Izdavač je zatvoreno:

@Preuzmi javnu prazninu onError (Throwable t) {t.printStackTrace (); } @Override public void onComplete () {System.out.println ("Gotovo"); }

Napišimo test za Obradu Teći. Koristit ćemo SubmissionPublisher klasa - konstrukcija iz java.util.concurrent - koji provodi Izdavač sučelje.

Poslat ćemo N elementi na Izdavač - koja naša Kraj pretplatnika primat će:

@Test public void whenSubscribeToIt_thenShouldConsumeAll () baca InterruptedException {// zadati SubmissionPublisher publisher = new SubmissionPublisher (); EndSubscriber pretplatnik = novi EndSubscriber (); publisher.subscribe (pretplatnik); Stavke na popisu = List.of ("1", "x", "2", "x", "3", "x"); // kada assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (izdavač :: submit); publisher.close (); // zatim await (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (items)); }

Imajte na umu da zovemo Zatvoriti() metoda na primjeru Kraj pretplatnika. Zazivat će onComplete () povratni poziv ispod svakog Pretplatnik od datog Izdavač.

Pokretanje tog programa rezultirat će sljedećim rezultatima:

Dobio: 1 Dobio: x Dobio: 2 Dobio: x Dobio: 3 Dobio: x Gotovo

4. Transformacija poruka

Recimo da želimo graditi sličnu logiku između a Izdavač i a Pretplatnik, ali i primijeniti neku transformaciju.

Stvorit ćemo TransformProcessor razred koji provodi Procesor i proteže se SubmissionPublisher - jer će ovo biti oboje Strizdavač i Spretplatnik.

Proći ćemo u Funkcija koji će pretvoriti ulaze u izlaze:

javna klasa TransformProcessor proširuje SubmissionPublisher implementira Flow.Processor {funkcija privatne funkcije; privatna pretplata na Flow.Subscription; javni TransformProcessor (funkcija funkcije) {super (); this.function = funkcija; } @Override public void onSubscribe (pretplata na Flow.Subscription) {this.subscription = pretplata; pretplata.zahtjev (1); } @Override public void onNext (T stavka) {submit (function.apply (item)); pretplata.zahtjev (1); } @Override public void onError (Throwable t) {t.printStackTrace (); } @Override public void onComplete () {close (); }}

Idemo sada napiši brzi test s protokom obrade u kojem Izdavač objavljuje Niz elementi.

Naše TransformProcessor analizirat će Niz kao Cijeli broj - što znači da se ovdje mora dogoditi konverzija:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll () baca InterruptedException {// zadati SubmissionPublisher publisher = new SubmissionPublisher (); TransformProcessor transformProcessor = novi TransformProcessor (Integer :: parseInt); EndSubscriber pretplatnik = novi EndSubscriber (); Stavke na popisu = List.of ("1", "2", "3"); Popis očekujeResult = Popis.of (1, 2, 3); // kada publisher.subscribe (transformProcessor); transformProcessor.subscribe (pretplatnik); items.forEach (izdavač :: submit); publisher.close (); // zatim await (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (očekujeResult)); }

Imajte na umu da je pozivanje Zatvoriti() metoda na bazi Izdavač izazvat će onComplete () metoda na TransformProcessor na koje se treba pozivati.

Imajte na umu da na ovaj način moraju biti zatvoreni svi izdavači u lancu obrade.

5. Kontrola potražnje za porukama pomoću Pretplata

Recimo da želimo potrošiti samo prvi element iz pretplate, primijeniti logiku i završiti obradu. Možemo koristiti zahtjev() metoda da se to postigne.

Izmijenimo svoj Kraj pretplatnika potrošiti samo N broja poruka. Taj ćemo broj proslijediti kao howMuchMessagesConsume argument konstruktora:

javna klasa EndSubscriber implementira Subscriber {private AtomicInteger howMuchMessagesConsume; pretplata na privatnu pretplatu; javni popis consumedElements = novi LinkedList (); javni EndSubscriber (Integer howMuchMessagesConsume) {this.howMuchMessagesConsume = novi AtomicInteger (howMuchMessagesConsume); } @Override public void onSubscribe (pretplata na pretplatu) {this.subscription = pretplata; pretplata.zahtjev (1); } @Override public void onNext (T stavka) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Dobio:" + stavka); consumedElements.add (stavka); if (howMuchMessagesConsume.get ()> 0) {subscription.request (1); }} // ...}

Elemente možemo tražiti sve dok to želimo.

Napišimo test u kojem želimo konzumirati samo jedan element iz zadanog Pretplata:

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne () baca InterruptedException {// zadati SubmissionPublisher publisher = new SubmissionPublisher (); EndSubscriber pretplatnik = novi EndSubscriber (1); publisher.subscribe (pretplatnik); Stavke na popisu = List.of ("1", "x", "2", "x", "3", "x"); Očekivani popis = List.of ("1"); // kada assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (izdavač :: submit); publisher.close (); // zatim await (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (očekuje se)); }

iako izdavač objavljuje šest elemenata, naš Kraj pretplatnika trošit će samo jedan element jer signalizira potražnju za obradom samo tog pojedinog.

Korištenjem zahtjev() metoda na Pretplata, možemo implementirati sofisticiraniji mehanizam povratnog pritiska za kontrolu brzine potrošnje poruke.

6. Zaključak

U ovom smo članku pogledali reaktivni tok Java 9.

Vidjeli smo kako stvoriti obradu Teći koji se sastoji od Izdavač i a Pretplatnik. Stvorili smo složeniji tijek obrade transformacijom elemenata pomoću Procesori.

Konačno, koristili smo Pretplata za kontrolu potražnje za elementima pomoću Pretplatnik.

Provedbu svih ovih primjera i isječaka koda možete pronaći u projektu GitHub - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.


$config[zx-auto] not found$config[zx-overlay] not found