Suočavanje s povratnim tlakom s RxJavom

1. Pregled

U ovom ćemo članku pogledati način na koji nam biblioteka RxJava pomaže u podnošenju povratnog tlaka.

Jednostavno rečeno - RxJava uvođenjem koristi koncept reaktivnih struja Uočljive, na koji jedan ili više Promatrači mogu se pretplatiti na. Suočavanje s moguće beskonačnim strujama vrlo je izazovno, jer se moramo suočiti s problemom povratnog tlaka.

Nije teško doći u situaciju u kojoj se Uočljiv emitira stavke brže nego što ih pretplatnik može potrošiti. Razmotrit ćemo različita rješenja problema rastućeg pufera nepotrošenih predmeta.

2. Vruće Vidljive Nasuprot hladnom Vidljive

Prvo, stvorimo jednostavnu potrošačku funkciju koja će se koristiti kao potrošač elemenata iz Vidljive koje ćemo definirati kasnije:

javna klasa ComputeFunction {javno statično void računanje (Integer v) {try {System.out.println ("izračunaj cijeli broj v:" + v); Navoj.spavanje (1000); } catch (InterruptedException e) {e.printStackTrace (); }}}

Naše izračunati () funkcija jednostavno ispisuje argument. Ovdje je važno primijetiti zazivanje a Navoj.spavanje (1000) metoda - radimo to kako bismo oponašali neki dugotrajni zadatak koji će izazvati Uočljiv kako bi se brže napunio stavkama Posmatrač mogu ih konzumirati.

Imamo dvije vrste Vidljive - vruće i Hladno - koji su potpuno različiti kada je u pitanju upravljanje povratnim tlakom.

2.1. Hladno Vidljive

Prehlada Uočljiv emitira određeni slijed predmeta, ali može započeti emitiranje tog niza kada je Posmatrač smatra da je to prikladno i bez obzira na brzinu Posmatrač želje, bez narušavanja integriteta niza. Hladno Uočljiv pruža predmete na lijen način.

The Posmatrač uzima elemente samo kad je spreman za obradu te stavke, a stavke ne trebaju biti međuspremnik u Uočljiv jer se traže na povlačenje.

Na primjer, ako stvorite Uočljiv na temelju statičkog raspona elemenata od jednog do jednog milijuna, to Uočljiv emitirao bi isti slijed predmeta bez obzira na to koliko se često te stavke promatraju:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute);

Kada započnemo naš program, stavke će se izračunati do Posmatrač lijeno i bit će zatraženo na povlačenje. The Planiranje.computacija () metoda znači da želimo pokrenuti svoj Posmatrač u okviru spremišta računalnih niti u sustavu Windows RxJava.

Rezultat programa sastojat će se od rezultata a izračunati () metoda koja se poziva za jednu po jednu stavku iz Uočljiv:

izračunati cijeli broj v: 1 izračunati cijeli broj v: 2 izračunati cijeli broj v: 3 izračunati cijeli broj v: 4 ...

Hladno Vidljive ne trebaju imati bilo kakav oblik povratnog tlaka jer djeluju povučeno. Primjeri predmeta koje emitira prehlada Uočljiv može uključivati ​​rezultate upita baze podataka, pronalaženja datoteka ili web zahtjeva.

2.2. Vruće Vidljive

Vruće Uočljiv počinje generirati predmete i emitira ih odmah kad su stvoreni. Suprotno je prehladi Vidljive povuci model obrade. Vruće Uočljiv emitira predmete vlastitim tempom, a na promatračima je da to prate.

Kada Posmatrač nije u stanju potrošiti predmete tako brzo kao što ih proizvodi Uočljiv treba ih usmjeriti u bafer ili riješiti na neki drugi način, jer će napuniti memoriju i konačno uzrokovati OutOfMemoryException.

Razmotrimo primjer vruće Promatrano, koja proizvodi milijun predmeta krajnjem potrošaču koji ih obrađuje. Kad izračunati () metoda u Posmatrač potrebno je neko vrijeme za obradu svake stavke, Uočljiv počinje puniti memoriju stavkama, što uzrokuje neuspjeh programa:

PublishSubject izvor = PublishSubject.create (); source.observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (izvor :: onNext); 

Pokretanje tog programa neće uspjeti s a MissingBackpressureException jer nismo definirali način postupanja s prekomjernom proizvodnjom Uočljiv.

Primjeri predmeta koje emitira vruća Uočljiv mogu uključivati ​​događaje s mišem i tipkovnicom, sistemske događaje ili cijene dionica.

3. Prekomjerna proizvodnja međuspremnika Uočljiv

Prvi način za rješavanje prekomjerne proizvodnje Uočljiv je definirati neku vrstu međuspremnika za elemente koje ne može obraditi Posmatrač.

To možemo učiniti pozivom a pufer() metoda:

PublishSubject izvor = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Definiranjem međuspremnika veličine 1024 dat će se Posmatrač neko vrijeme kako bi sustigli izvor prekomjerne proizvodnje. U međuspremnik će se pohraniti stavke koje još nisu obrađene.

Možemo povećati veličinu međuspremnika kako bismo imali dovoljno prostora za proizvedene vrijednosti.

Međutim, imajte na umu da općenito, ovo je možda samo privremeni popravak jer se preljev i dalje može dogoditi ako izvor prekomjerno proizvodi predviđenu veličinu međuspremnika.

4. Doziranje emitiranih predmeta

Prekomjerno proizvedene stavke možemo grupirati u prozorima s N elemenata.

Kada Uočljiv proizvodi elemente brže od Posmatrač možemo ih obraditi, to možemo ublažiti grupiranjem proizvedenih elemenata i slanjem serije elemenata na Posmatrač koji je u stanju obraditi zbirku elemenata umjesto elemenata jedan po jedan:

PublishSubject izvor = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Koristeći prozor() metoda s argumentom 500, reći ću Uočljiv za grupiranje elemenata u serije veličine 500. Ova tehnika može smanjiti problem prekomjerne proizvodnje Uočljiv kada Posmatrač sposoban je brže obraditi skup elemenata u usporedbi s obradom elemenata jedan po jedan.

5. Preskakanje elemenata

Ako neke od vrijednosti koje proizvodi Uočljiv mogu se sigurno zanemariti, možemo koristiti uzorkovanje unutar određenog vremena i rukovatelje.

Metode uzorak() i throttleFirst () uzimaju trajanje kao parametar:

  • The sdovoljno () metoda povremeno proučava slijed elemenata i emitira posljednju stavku koja je proizvedena u trajanju navedenom kao parametar
  • The throttleFirst () metoda emitira prvu stavku koja je proizvedena nakon trajanja navedenog kao parametar

Trajanje je vrijeme nakon kojeg se jedan određeni element bira iz niza proizvedenih elemenata. Možemo odrediti strategiju rukovanja povratnim pritiskom preskakanjem elemenata:

PublishSubject izvor = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace);

Precizirali smo da će strategija preskakanja elemenata biti uzorak() metoda. Želimo uzorak niza od 100 milisekundi. Taj će se element emitirati u Posmatrač.

Međutim, imajte na umu da ovi operatori samo smanjuju brzinu prijema vrijednosti od strane nizvodnog Posmatrač a time i dalje mogu dovesti do MissingBackpressureException.

6. Rukovanje punjenjem Uočljiv Pufer

U slučaju da naše strategije uzorkovanja ili doziranja elemenata ne pomažu u popunjavanju međuspremnika, moramo provesti strategiju rješavanja slučajeva kada se pufer puni.

Moramo koristiti onBackpressureBuffer () metoda za sprečavanje BufferOverflowException.

The onBackpressureBuffer () metoda uzima tri argumenta: sposobnost an Uočljiv međuspremnik, metoda koja se poziva kada se međuspremnik puni i strategija za rukovanje elementima koji trebaju biti odbačeni iz međuspremnika. Strategije za prelijevanje su u Povratni tlak Preljev razred.

Postoje 4 vrste radnji koje se mogu izvršiti kada se me uspremnik napuni:

  • ON_OVERFLOW_ERROR - ovo je zadana signalizacija ponašanja a BufferOverflowException kad se me uspremnik napuni
  • ON_OVERFLOW_DEFAULT - trenutno je isto kao ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - ako se dogodi preljev, trenutna vrijednost jednostavno će se zanemariti i samo će se stare vrijednosti isporučiti jednom nizvodno Posmatrač zahtjevi
  • ON_OVERFLOW_DROP_OLDEST - ispušta najstariji element u međuspremnik i dodaje mu trenutnu vrijednost

Pogledajmo kako odrediti tu strategiju:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()). Pretplata (e -> {}, Throwable :: printStackTrace; 

Ovdje je naša strategija za rukovanje prepunim međuspremnikom ispuštanje najstarijeg elementa u međuspremnik i dodavanje najnovije stavke koju je proizveo Uočljiv.

Imajte na umu da posljednje dvije strategije uzrokuju diskontinuitet u streamu kada ispadaju iz elemenata. Uz to, neće signalizirati BufferOverflowException.

7. Ispuštanje svih prekomjerno proizvedenih elemenata

Kad god nizvodno Posmatrač nije spreman primiti element, možemo koristiti onBackpressureDrop () metoda za ispuštanje tog elementa iz niza.

O toj metodi možemo razmišljati kao o onBackpressureBuffer () metoda s kapacitetom međuspremnika postavljenog na nulu sa strategijom ON_OVERFLOW_DROP_LATEST.

Ovaj je operator koristan kada možemo sigurno zanemariti vrijednosti iz izvora Uočljiv (poput pomicanja miša ili trenutnih GPS signala lokacije) jer će kasnije biti novih podataka:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute) .subscribe (v -> {}, Throwable :: printStackTrace);

Metoda onBackpressureDrop () uklanja problem pretjerane proizvodnje Uočljiv ali ga treba koristiti oprezno.

8. Zaključak

U ovom smo članku razmotrili problem prekomjerne proizvodnje Uočljiv i načini rješavanja povratnog tlaka. Gledali smo strategije međuspremnika, doziranja i preskakanja elemenata kada Posmatrač nije u stanju potrošiti elemente tako brzo kao što ih proizvodi Uočljiv.

Provedbu svih ovih primjera i isječaka koda možete pronaći u projektu GitHub - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.


$config[zx-auto] not found$config[zx-overlay] not found