Uvod u jezgru reaktora

1. Uvod

Reactor Core je knjižnica Java 8 koja provodi model reaktivnog programiranja. Izgrađen je na vrhu Specifikacije reaktivnih struja, standarda za izgradnju reaktivnih aplikacija.

Iz pozadine nereaktivnog razvoja Jave, prelazak na reaktivnost može biti prilično strma krivulja učenja. To postaje izazovnije kada ga uspoređujete s Javom 8 Stream API, jer bi se mogli zamijeniti da su iste apstrakcije na visokoj razini.

U ovom ćemo članku pokušati demistificirati ovu paradigmu. Krenut ćemo malim koracima kroz Reactor dok ne izgradimo sliku o tome kako sastaviti reaktivni kod, postavljajući temelje za naprednije članke koji će se pojaviti u kasnijim serijama.

2. Specifikacija reaktivnih struja

Prije nego što pogledamo reaktor, trebali bismo pogledati specifikaciju reaktivnih struja. To je ono što Reactor primjenjuje i postavlja temelje za knjižnicu.

U osnovi je reaktivni tok specifikacija za asinkronu obradu toka.

Drugim riječima, sustav u kojem se puno događaja proizvodi i konzumira asinkrono. Razmislite o toku tisuća ažuriranja dionica u sekundi koje dolaze u financijsku aplikaciju i da bi on trebao pravovremeno odgovoriti na ta ažuriranja.

Jedan od glavnih ciljeva ovoga je rješavanje problema povratnog tlaka. Ako imamo proizvođača koji emitira događaje potrošaču brže nego što ih može obraditi, tada će potrošač na kraju biti preplavljen događajima koji će ostati bez resursa sustava.

Protutlak znači da bi naš potrošač trebao moći proizvođaču reći koliko podataka treba poslati kako bi to spriječio, a to je ono što je navedeno u specifikaciji.

3. Ovisnosti Mavena

Prije nego što započnemo, dodajte naše ovisnosti o Mavenu:

 io.projectreactor reactor-core 3.3.9.Opusti ch.qos.logback logback-classic 1.1.3 

Također dodajemo Povrat kao ovisnost. To je zato što ćemo evidentirati izlaz Reaktora kako bismo bolje razumjeli protok podataka.

4. Stvaranje toka podataka

Da bi aplikacija bila reaktivna, prvo što mora biti u stanju je proizvesti tok podataka.

Ovo bi moglo biti nešto poput primjera ažuriranja dionica koji smo ranije dali. Bez ovih podataka ne bismo imali na što reagirati, zbog čega je ovo logičan prvi korak.

Reactive Core daje nam dvije vrste podataka koje nam to omogućuju.

4.1. Fluks

Prvi način da se to učini pomoću a Fluks. To je tok koji može emitirati 0..n elementi. Pokušajmo stvoriti jednostavan:

Samo protok = protok.pravedan (1, 2, 3, 4);

U ovom slučaju imamo statički tok od četiri elementa.

4.2. Mono

Drugi način za to je pomoću a Mono, koji je potok 0..1 elementi. Pokušajmo instancirati jedan:

Mono just = Mono.just (1);

Ovo izgleda i ponaša se gotovo potpuno isto kao i Fluks, samo što smo ovog puta ograničeni na više od jednog elementa.

4.3. Zašto ne samo protok?

Prije daljnjeg eksperimentiranja vrijedi istaknuti zašto imamo ove dvije vrste podataka.

Prvo, valja napomenuti da su oba a Fluks i Mono su implementacije reaktivnih struja Izdavač sučelje. Obje su klase u skladu sa specifikacijom, a umjesto njih mogli bismo koristiti ovo sučelje:

Izdavač just = Mono.just ("foo");

Ali stvarno, poznavanje ove kardinalnosti je korisno. To je zato što nekoliko operacija ima smisla samo za jednu od dvije vrste i zato što može biti izražajnije (zamislite findOne () u spremištu).

5. Pretplata na stream

Sada imamo pregled na visokoj razini kako stvoriti tok podataka, moramo se pretplatiti na njega kako bi emitirao elemente.

5.1. Prikupljanje elemenata

Iskoristimo pretplatite se () metoda za prikupljanje svih elemenata u toku:

Elementi popisa = novi ArrayList (); Flux.just (1, 2, 3, 4) .log () .subscribe (elements :: add); assertThat (elements) .containsExactly (1, 2, 3, 4);

Podaci neće početi teći dok se ne pretplatimo. Primijetite da smo dodali i neke zapisnike, ovo će biti korisno kada pogledamo što se događa iza kulisa.

5.2. Protok elemenata

S prijavljivanjem na mjestu, možemo ga koristiti za vizualizaciju kako podaci prolaze kroz naš tok:

20: 25: 19.550 [glavna] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 25: 19.553 [glavna] INFO reaktor.Flux.Array.1 - | zahtjev (neograničen) 20: 25: 19.553 [glavna] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 25: 19.553 [glavni] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 25: 19.553 [glavna] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 25: 19.553 [glavni] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 25: 19.553 [glavna] INFO reaktor.Flux.Array.1 - | onComplete ()

Prije svega, sve radi na glavnoj niti. Nemojmo ulaziti u detalje o ovome, jer ćemo podudarnost dalje proučavati kasnije u ovom članku. To ipak čini stvari jednostavnima jer se možemo nositi sa svime po redu.

Ajmo sada proći kroz slijed koji smo bilježili jedan po jedan:

  1. onSubscribe () - To se zove kada se pretplatimo na naš stream
  2. zahtjev (neograničen) - Kad nazovemo pretplatite se, iza kulisa stvaramo Pretplata. Ova pretplata zahtijeva elemente iz streama. U ovom slučaju to je zadano neograničeno, što znači da zahtijeva svaki pojedini element dostupan
  3. onNext () - To se naziva za svaki pojedini element
  4. onComplete () - To se naziva posljednjim, nakon primanja posljednjeg elementa. Zapravo postoji onError () također, što bi se nazvalo ako postoji iznimka, ali u ovom slučaju nema

To je tok izložen u Pretplatnik sučelje kao dio Specifikacije reaktivnih tokova, a u stvarnosti, to je ono što je postavljeno iza kulisa u našem pozivu na onSubscribe (). To je korisna metoda, ali da bismo bolje razumjeli što se događa, navedimo a Pretplatnik sučelje izravno:

Flux.just (1, 2, 3, 4) .log () .subscribe (novi pretplatnik () {@Preuzmi javnu prazninu onSubscribe (Pretplata) {s.request (Long.MAX_VALUE);} @Override public void onNext ( Integer integer) {elements.add (integer);} @Preuzmi javnu prazninu onError (Throwable t) {} @Override public void onComplete () {}});

Možemo vidjeti da se svaka moguća faza u gore navedenom protoku preslikava na metodu u Pretplatnik provedba. Jednostavno se dogodi da Fluks nam je pružio pomoćnu metodu za smanjenje ove opširnosti.

5.3. Usporedba s Javom 8 Potoci

Još uvijek se može činiti da imamo nešto sinonim za Javu 8 Stream radi prikupljanje:

Popis prikupljen = Stream.of (1, 2, 3, 4) .collect (toList ());

Samo mi nemamo.

Suštinska razlika je u tome što je Reactive push model, dok je Java 8 Potoci su pull model. U reaktivnom pristupu događaji su gurnuo pretplatnicima kad uđu.

Sljedeće što treba primijetiti je Potoci terminal operater je upravo to, terminal, povlači sve podatke i vraća rezultat. S Reactiveom bismo mogli imati beskonačni tok koji dolazi iz vanjskog resursa, s više pretplatnika koji su priloženi i uklonjeni ad hoc. Možemo raditi i stvari kao što su kombinirani tokovi, struje leptira za gas i primjenjivati ​​protutlak, koji ćemo sljedeće pokriti.

6. Protutlak

Sljedeće što bismo trebali uzeti u obzir je povratni tlak. U našem primjeru pretplatnik govori proizvođaču da odjednom pritisne svaki pojedini element. To bi na kraju moglo postati pretjerano za pretplatnika, trošeći sve njegove resurse.

Protutlak je kada nizvodni može uzlaznom toku reći da mu pošalje manje podataka kako bi se spriječilo da bude preopterećen.

Možemo izmijeniti svoj Pretplatnik primjena za primjenu povratnog tlaka. Recimo uzvodnom da istovremeno šalje samo dva elementa zahtjev():

Flux.just (1, 2, 3, 4) .log () .subscribe (novi pretplatnik () {privatna pretplata s; int onNextAmount; @Preuzmi javnu prazninu onSubscribe (pretplate) {this.s = s; s.request (2);} @Override public void onNext (Integer integer) {elements.add (integer); onNextAmount ++; if (onNextAmount% 2 == 0) {s.request (2);}} @Override public void onError (Throwable) t) {} @Preuzmi javnu prazninu onComplete () {}});

Sad ako ponovo pokrenemo naš kod, vidjet ćemo zahtjev (2) , a zatim dva onNext () onda poziva zahtjev (2) opet.

23: 31: 15.395 [glavna] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 23: 31: 15.397 [glavna] INFO reaktor.Flux.Array.1 - | zahtjev (2) 23: 31: 15.397 [glavni] INFO reaktor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [glavna] INFO reaktor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [glavna] INFO reaktor.Flux.Array.1 - | zahtjev (2) 23: 31: 15.398 [glavna] INFO reaktor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [glavna] INFO reaktor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [glavna] INFO reaktor.Flux.Array.1 - | zahtjev (2) 23: 31: 15.398 [glavna] INFO reaktor.Flux.Array.1 - | onComplete ()

U osnovi, ovo je reaktivni povratni pritisak. Zahtijevamo da uzvodno potisne samo određenu količinu elemenata i to samo kad smo spremni.

Ako zamislimo da su nam s Twittera premještani tweetovi, tada bi gornja strana trebala odlučiti što učiniti. Ako su tweetovi dolazili, ali nema zahtjeva od nizvodno, tada bi uzvodno mogli ispustiti predmete, spremiti ih u međuspremnik ili neku drugu strategiju.

7. Rad na streamu

Također možemo izvoditi operacije nad podacima u našem toku, reagirajući na događaje kako smatramo potrebnim.

7.1. Mapiranje podataka u tok

Jednostavna operacija koju možemo izvršiti je primjena transformacije. U ovom slučaju, samo udvostručimo sve brojeve u našem streamu:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribe (elements :: add);

karta() primjenjivat će se kada onNext () Zove se.

7.2. Kombinirajući dva toka

Tada možemo stvari učiniti zanimljivijima kombinirajući drugi tok s ovim. Pokušajmo ovo pomoću ZIP () funkcija:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (one, two) -> String.format ("Prvi tok:% d, Drugi tok:% d", jedan, dva)) .subscribe (elements :: add); assertThat (elements) .containEhactly ("Prvi tok: 2, Drugi tok: 0", "Prvi tok: 4, Drugi tok: 1", "Prvi tok: 6, Drugi tok: 2", "Prvi tok: 8, Drugi" Tok: 3 ");

Evo, stvaramo još jedan Fluks koji se neprestano uvećava za jedan i struji zajedno s našim izvornim. Možemo vidjeti kako ovo zajedno funkcionira pregledavanjem dnevnika:

20: 04: 38.064 [glavna] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 04: 38,065 [glavna] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 04: 38.066 [glavni] INFO reaktor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38,066 [glavna] INFO reaktor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [glavna] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [glavni] INFO reaktor.Flux.Range.2 - | onNext (1) 20: 04: 38.067 [glavna] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [glavni] INFO reaktor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [glavna] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 04: 38.067 [glavni] INFO reaktor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [glavna] INFO reaktor.Flux.Array.1 - | onComplete () 20: 04: 38.067 [glavna] INFO reaktor.Flux.Array.1 - | cancel () 20: 04: 38.067 [glavna] INFO reaktor.Flux.Range.2 - | otkazati()

Imajte na umu kako sada imamo jednu pretplatu po Fluks. The onNext () pozivi se također izmjenjuju, tako da će se indeks svakog elementa u toku podudarati kada primijenimo ZIP () funkcija.

8. Vrući potoci

Trenutno smo se fokusirali prvenstveno na hladne potoke. To su statični tokovi fiksne duljine s kojima je lako riješiti se. Realističniji slučaj upotrebe reaktivne reakcije može biti nešto što se događa beskonačno.

Na primjer, mogli bismo imati tok pokreta miša na koji stalno treba reagirati ili twitter feed. Te se vrste tokova nazivaju vrućim, jer se uvijek izvode i na njih se može pretplatiti u bilo kojem trenutku, nedostajući početak podataka.

8.1. Stvaranje a Spojivi protok

Jedan od načina za stvaranje vrućeg mlaza je pretvaranjem hladnog mlaza u jedan. Stvorimo a Fluks koji traje vječno, izbacujući rezultate na konzolu, koja bi simulirala beskonačan tok podataka koji dolaze iz vanjskog resursa:

ConnectableFlux objavljivanje = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

Pozivanjem objaviti() dobivamo a Spojivi protok. To znači da zovete pretplatite se () neće uzrokovati da počne emitirati, što nam omogućuje dodavanje više pretplata:

objaviti. pretplata (System.out :: println); objaviti. pretplata (System.out :: println);

Ako pokušamo pokrenuti ovaj kod, ništa se neće dogoditi. Tek kad nazovemo Spojiti(), da je Fluks počet će emitirati:

objaviti.povezati ();

8.2. Gašenje

Ako pokrenemo naš kod, naša će konzola biti preplavljena bilježenjem. Ovo simulira situaciju kada se previše podataka prenosi našim potrošačima. Pokušajmo to zaobići s prigušivanjem:

ConnectableFlux objavljivanje = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .sample (ofSeconds (2)) .publish ();

Evo, uveli smo a uzorak() metoda s razmakom od dvije sekunde. Sada će se vrijednosti pretplatniku slati samo svake dvije sekunde, što znači da će konzola biti puno manje naporna.

Naravno, postoji više strategija za smanjenje količine podataka poslanih nizvodno, poput prozora i međuspremnika, ali oni će ostati izvan opsega ovog članka.

9. Istodobnost

Svi gore navedeni primjeri trenutno rade na glavnoj niti. Međutim, možemo kontrolirati na kojoj se niti pokreće naš kod ako želimo. The Planer sučelje pruža apstrakciju oko asinkronog koda, za što su nam predviđene mnoge implementacije. Pokušajmo se pretplatiti na drugu nit na main:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()) .subscribe (elements :: add);

The Paralelno planer će uzrokovati da se naša pretplata izvodi na drugoj niti, što možemo dokazati gledajući zapisnike. Vidimo da prvi unos dolazi iz glavni nit i Flux radi u drugoj niti koja se zove paralelno-1.

20:03:27.505 [glavni] DEBUG reactor.util.Loggers $ LoggerFactory - Korištenje Slf4j logging framework 20: 03: 27.529 [paralelno-1] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 03: 27.531 [paralelno-1] INFO reaktor.Flux.Array.1 - | zahtjev (neograničen) 20: 03: 27.531 [paralelno-1] INFO reaktor.Flux.Array.1 - | naSljedeće (1) 20: 03: 27.531 [paralelno-1] INFO reaktor.Flux.Array.1 - | naSljedeće (2) 20: 03: 27.531 [paralelno-1] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [paralelno-1] INFO reaktor.Flux.Array.1 - | naSljedeće (4) 20: 03: 27.531 [paralelno-1] INFO reaktor.Flux.Array.1 - | onComplete ()

Istodobnost dobivanja zanimljivija je od ove i vrijedi nas istražiti u drugom članku.

10. Zaključak

U ovom smo članku dali visoki, cjeloviti pregled reaktivne jezgre. Objasnili smo kako možemo objavljivati ​​i pretplaćivati ​​se na streamove, primjenjivati ​​povratni tlak, raditi na streamovima i također sinkrono obrađivati ​​podatke. To bi, nadamo se, trebalo postaviti temelje za pisanje reaktivnih aplikacija.

Kasniji članci u ovoj seriji pokrivat će napredniju paralelnost i druge reaktivne koncepte. Tu je i još jedan članak koji pokriva Reactor with Spring.

Izvorni kod za našu aplikaciju dostupan je na više od GitHub; ovo je Maven projekt koji bi trebao biti u stanju pokrenuti kakav jest.


$config[zx-auto] not found$config[zx-overlay] not found