RxJava 2 - Tekuća

1. Uvod

RxJava je reaktivna ekstenzija Java implementacija koja nam omogućuje pisanje asinkronih aplikacija vođenih događajima. Više informacija o korištenju RxJave možete pronaći u uvodnom članku ovdje.

RxJava 2 prepisana je ispočetka, što je donijelo više novih značajki; od kojih su neki stvoreni kao odgovor na probleme koji su postojali u prethodnoj verziji okvira.

Jedna od takvih značajki je io.reactivex.Tekuće.

2. Uočljiv nasuprot. Tekuće

U prethodnoj verziji RxJave postojala je samo jedna osnovna klasa za bavljenje izvorima koji nisu svjesni povratnog tlaka - Uočljiv.

RxJava 2 uveo je jasnu razliku između ove dvije vrste izvora - izvori svjesni povratnog tlaka sada su predstavljeni pomoću posebne klase - Tekuće.

Uočljiv izvori ne podržavaju povratni tlak. Zbog toga bismo ga trebali koristiti za izvore koje samo konzumiramo i na koje ne možemo utjecati.

Također, ako imamo posla s velikim brojem elemenata, mogu se dogoditi dva moguća scenarija povezana s protutlakom, ovisno o vrsti Uočljiv.

U slučaju korištenja tzv hladno Uočljiv“, događaji se emitiraju lijeno, tako da smo sigurni da ne možemo preplaviti promatrača.

Kada koristite vruće Uočljivmeđutim, to će i dalje emitirati događaje, čak i ako potrošač ne može pratiti.

3. Stvaranje a Tekuće

Postoje različiti načini za stvaranje a Tekuće. Nama je povoljno, te metode izgledaju slično metodama u Uočljiv u prvoj verziji RxJave.

3.1. Jednostavan Tekuće

Možemo stvoriti Tekuće koristiti samo() metoda slično kao što bismo mogli Uočljivo:

Tekući cijeli brojFlowable = Tekući.just (1, 2, 3, 4);

Iako se koristi samo() je vrlo jednostavno, nije baš uobičajeno stvarati Tekuće iz statičkih podataka i koristi se u svrhu testiranja.

3.2. Tekuće iz Uočljiv

Kad imamo Uočljiv možemo ga lako transformirati u Tekuće koristiti doFlowable () metoda:

Observable integerObservable = Observable.just (1, 2, 3); Tekući cjelobrojniFlowable = integerObservable .toFlowable (BackpressureStrategy.BUFFER);

Primijetite da da bismo mogli izvršiti pretvorbu, moramo obogatiti Uočljiv s Strategija povratnog tlaka. Dostupne strategije opisat ćemo u sljedećem odjeljku.

3.3. Tekuće iz FlowableOnSubscribe

RxJava 2 predstavio je funkcionalno sučelje FlowableOnSubscribe, koji predstavlja a Tekuće koji počinje emitirati događaje nakon što se potrošač na njega pretplati.

Zbog toga će svi klijenti dobiti isti skup događaja, što čini FlowableOnSubscribe siguran protiv povratnog tlaka.

Kad imamo FlowableOnSubscribe možemo ga koristiti za stvaranje Tekuće:

FlowableOnSubscribe flowableOnSubscribe = tekuće -> protočno.onNext (1); Tekući cijeli brojFlowable = Tekući .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);

Dokumentacija opisuje još mnogo metoda za stvaranje Tekuće.

4. TekućeStrategija povratnog tlaka

Neke metode poput doFlowable () ili stvoriti() uzeti a Strategija povratnog tlaka kao argument.

The Strategija povratnog tlaka je nabrajanje, koje definira ponašanje povratnog tlaka koje ćemo primijeniti na svoje Tekuće.

Može predmemorirati ili ispustiti događaje ili uopće ne implementirati bilo kakvo ponašanje, u posljednjem ćemo slučaju biti odgovorni za njegovo definiranje pomoću operatora povratnog tlaka.

Strategija povratnog tlaka je sličan Režim povratnog tlaka prisutan u prethodnoj verziji RxJave.

U RxJava 2 dostupno je pet različitih strategija.

4.1. Pufer

Ako koristimo BackpressureStrategy.BUFFER, izvor će međuspremiti sve događaje dok ih pretplatnik ne može potrošiti:

javna void thenAllValuesAreBufferedAndReceived () {Popis testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = vidljivo .toFlowable (BackpressureStrategy.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); Popis receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertEquals (testList, receivedInts); }

Slično je pozivanju onBackpressureBuffer () metoda na Tekuće, ali ne dopušta eksplicitno definiranje veličine međuspremnika ili akcije onOverflow.

4.2. Pad

Možemo koristiti BackpressureStrategy.DROP da odbaci događaje koji se ne mogu konzumirati umjesto da ih spremi u međuspremnik.

Opet je ovo slično korištenju onBackpressureDrop() na Tekuće:

javna praznina whenDropStrategyUsed_thenOnBackpressureDropped () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = vidljivo .toFlowable (BackpressureStrategy.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Popis receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertThat (полученInts.size () <testList.size ()); assertThat (! receivedInts.contens (100000)); }

4.3. Najnoviji

Koristiti BackpressureStrategy.LATEST prisilit će izvor da zadrži samo najnovije događaje, prepisujući tako sve prethodne vrijednosti ako potrošač ne može pratiti:

javna praznina whenLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = vidljivo .toFlowable (BackpressureStrategy.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Popis receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt) .boxed () .collect (Collectors.toList ()); assertThat (полученInts.size () <testList.size ()); assertThat (полученInts.contens (100000)); }

BackpressureStrategy.LATEST i BackpressureStrategy.DROP izgledaju vrlo slično kad gledamo kod.

Međutim, BackpressureStrategy.LATEST prepisat će elemente s kojima naš pretplatnik ne može rukovati i zadržati samo one najnovije, pa otuda i naziv.

BackpressureStrategy.DROP, s druge strane, odbacit će elemente s kojima se ne može rukovati. To znači da najnoviji elementi neće nužno biti emitirani.

4.4. Pogreška

Kada koristimo BackpressureStrategy.ERROR, mi to jednostavno kažemo ne očekujemo povratni pritisak. Slijedom toga, a MissingBackpressureException treba baciti ako potrošač ne može pratiti izvor:

javna praznina whenErrorStrategyUsed_thenExceptionIsThrown () {Observable observable = Observable.range (1, 100000); Pretplatnik na TestSubscriber = vidljivo .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); pretplatnik.awaitTerminalEvent (); pretplatnik.assertError (MissingBackpressureException.class); }

4.5. Nedostaje

Ako koristimo BackpressureStrategy.NEDOSTAJE, izvor će gurati elemente bez odbacivanja ili međuspremnika.

Nizvodni će se u ovom slučaju morati nositi s preljevima:

javna praznina whenMissingStrategyUsed_thenException () {Observable observable = Observable.range (1, 100000); Pretplatnik na TestSubscriber = vidljiv .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); pretplatnik.awaitTerminalEvent (); pretplatnik.assertError (MissingBackpressureException.class); }

U našim testovima, izuzeti smo MissingbackpressureException za oboje POGREŠKA i NEDOSTAJE strategije. Kako će obojica izuzeti takvu iznimku kada se unutarnji međuspremnik izvora prelije.

Međutim, vrijedi napomenuti da obojica imaju drugačiju svrhu.

Prvu bismo trebali koristiti kad uopće ne očekujemo povratni tlak, a želimo da izvor izuzme iznimku u slučaju da se dogodi.

Potonji bi se mogao koristiti ako ne želimo odrediti zadano ponašanje pri izradi Tekuće. I koristit ćemo operatore povratnog tlaka da bismo ga kasnije definirali.

5. Sažetak

U ovom uputstvu predstavili smo novi razred predstavljen u RxJavi 2 pozvao Tekuće.

Da biste pronašli više informacija o Tekuće sami i to je API možemo se pozvati na dokumentaciju.

Kao i uvijek, svi uzorci koda mogu se naći na GitHubu.