Planeri u RxJavi

1. Pregled

U ovom ćemo se članku usredotočiti na različite vrste Planeri koje ćemo koristiti pri pisanju programa s više niti koji se temelje na RxJavi Observable's subscribeOn i promatratiOn metode.

Planeri dati mogućnost da se odredi gdje i vjerojatno kada treba izvršiti zadatke povezane s operacijom Uočljiv lanac.

Možemo dobiti a Planer iz tvorničkih metoda opisanih u klasi Planeri.

2. Zadano ponašanje navoja

Prema zadanim postavkama,Rx je jednonitni što implicira da an Uočljiv a lanac operatora koji se na njega možemo prijaviti obavijestit će svoje promatrače na istoj niti na kojoj je i pretplatite se () metoda se naziva.

The promatratiOn i pretplatiteOn metode uzimaju kao argument a Planer, to je, kao što i samo ime govori, alat koji možemo koristiti za zakazivanje pojedinačnih radnji.

Stvorit ćemo našu implementaciju a Planer pomoću stvoritiRadnik metoda koja vraća a Planer.Radnik. A radnik prihvaća radnje i izvršava ih uzastopno na jednoj niti.

Na neki način, a radnik je Ssam cheduler, ali nećemo ga nazivati ​​a Planer kako bi se izbjegla zabuna.

2.1. Zakazivanje radnje

Možemo zakazati posao za bilo koji Planer stvaranjem nove radnik i zakazivanje nekih radnji:

Planer planera = Schedulers.immediate (); Planer.Worker radnik = planer.createWorker (); worker.schedule (() -> rezultat + = "akcija"); Assert.assertTrue (result.equals ("action"));

Zatim se radnja stavlja u red čekanja na niti kojoj je radnik dodijeljen.

2.2. Otkazivanje radnje

Planer.Radnik proteže se Pretplata. Pozivanje odjaviti se metoda na a radnik rezultirat će ispražnjenjem reda i otkazivanjem svih zadataka na čekanju. To možemo vidjeti na primjeru:

Planer planera = Schedulers.newThread (); Planer.Worker radnik = planer.createWorker (); worker.schedule (() -> {result + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> rezultat + = "Druga_akcija"); Assert.assertTrue (result.equals ("First_Action"));

Drugi se zadatak nikada ne izvršava jer je onaj prije njega otkazao cijelu operaciju. Prekinut će se radnje koje su bile u procesu izvršenja.

3. Rasporednici.newThread

Ovaj planer jednostavno pokreće novu nit svaki put kad se zatraži putem pretplatite se na () ili obserOn ().

Teško da je to dobar izbor, ne samo zbog latencije koja je uključena prilikom pokretanja niti, već i zbog toga što se ova nit ne upotrebljava ponovno:

Observable.just ("Hello") .observeOn (Schedulers.newThread ()) .doOnNext (s -> result2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()) .subscribe (s - > result1 + = Thread.currentThread (). getName ()); Navoj.spavanje (500); Assert.assertTrue (result1.equals ("RxNewThreadScheduler-1")); Assert.assertTrue (result2.equals ("RxNewThreadScheduler-2"));

Kada Radnik je gotovo, nit se jednostavno završava. Ovaj Planer može se koristiti samo kada su zadaci grubozrnati: potrebno je puno vremena za izvršavanje, ali vrlo je malo njih, tako da je malo vjerojatno da će se niti uopće ponovno upotrijebiti.

Planer planera = Schedulers.newThread (); Planer.Worker radnik = planer.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Navoj.spavanje (3000); Assert.assertTrue (result.equals ("RxNewThreadScheduler-1_Start_End_worker_"));

Kad smo zakazali radnik na a NewThreadScheduler, vidjeli smo da je radnik vezan za određenu nit.

4. Planeri.neposredni

Planeri.neposredni je poseban planer koji poziva zadatak unutar klijentske niti na blokirajući način, a ne asinkrono i vraća se kada je radnja dovršena:

Planer planera = Schedulers.immediate (); Planer.Worker radnik = planer.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Navoj.spavanje (500); Assert.assertTrue (result.equals ("main_Start_worker__End"));

Zapravo, pretplatom na Uočljiv preko neposredni Planer obično ima isti učinak kao da se ne pretplatite ni na jedan određeni Scheduler uopće:

Observable.just ("Hello") .subscribeOn (Schedulers.immediate ()) .subscribe (s -> result + = Thread.currentThread (). GetName ()); Navoj.spavanje (500); Assert.assertTrue (result.equals ("main"));

5. Planeri.trampolin

The trampolinPlaner je vrlo sličan neposredna jer također planira zadatke u istoj niti, učinkovito blokirajući.

Međutim, nadolazeći se zadatak izvršava kad se završe svi prethodno zakazani zadaci:

Observable.just (2, 4, 6, 8) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> rezultat + = "" + i); Observable.just (1, 3, 5, 7, 9) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> rezultat + = "" + i); Navoj.spavanje (500); Assert.assertTrue (result.equals ("246813579"));

Odmah odmah poziva na zadani zadatak, dok trampolin čeka da trenutni zadatak završi.

The trampolin‘S radnik izvršava svaki zadatak u niti koja je zakazala prvi zadatak. Prvi poziv raspored blokira dok se red ne isprazni:

Planer planera = Schedulers.trampoline (); Planer.Worker radnik = planer.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "Start"; worker.schedule (() -> {result + = "_middleStart"; worker.schedule (() -> rezultat + = "_worker_"); rezultat + = "_middleEnd";}); rezultat + = "_mainEnd";}); Navoj.spavanje (500); Assert.assertTrue (rezultat .equals ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Planeri.od

Planeri su interno složeniji od Izvršitelji iz java.util.concurrent - pa je bila potrebna zasebna apstrakcija.

Ali budući da su konceptualno prilično slični, nije iznenađujuće da postoji omot koji se može okrenuti Izvršitelj u Planer koristiti iz tvornička metoda:

private ThreadFactory threadFactory (uzorak niza) {return new ThreadFactoryBuilder () .setNameFormat (pattern) .build (); } @Test javna praznina givenExecutors_whenSchedulerFrom_thenReturnElements () baca InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d")); Planer planeraA = Planeri.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); Planer planeraB = Planeri.from (poolB); Observable observable = Observable.create (pretplatnik -> {pretplatnik.onNext ("Alfa"); pretplatnik.onNext ("Beta"); pretplatnik.onCompleted ();}) ;; vidljiv .subscribeOn (schedulerA) .subscribeOn (schedulerB) .subscribe (x -> result + = Thread.currentThread (). getName () + x + "_", Throwable :: printStackTrace, () -> result + = "_Completed "); Tema.spavanje (2000.); Assert.assertTrue (result.equals ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

PlanerB koristi se kratko vrijeme, ali jedva planira novu akciju na rokovnikA, koji obavlja sav posao. Dakle, višestruka subscribeOn metode ne samo da se zanemaruju, već uvode i male troškove.

7. Planeri.io

Ovaj Planer je sličan newThread osim činjenice da se već započete niti recikliraju i mogu obrađivati ​​buduće zahtjeve.

Ova implementacija djeluje slično kao ThreadPoolExeecuter iz java.util.concurrent s neograničenim bazenom niti. Svaki put novi radnik zatraži se ili se pokrene nova nit (i kasnije neko vrijeme ostane u stanju mirovanja) ili se ona ponovno koristi:

Observable.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> rezultat + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxIoScheduler-2"));

Moramo biti oprezni s neograničenim resursima bilo koje vrste - u slučaju usporenih ili neodgovornih vanjskih ovisnosti poput web usluga, ioplanera može pokrenuti ogroman broj niti, što dovodi do toga da naša vlastita aplikacija postaje neodzivna.

U praksi, slijedeći Planeri.io je gotovo uvijek bolji izbor.

8. Planeri.računavanje

Računanje Scheduler prema zadanim postavkama ograničava broj niti koje se izvode paralelno s vrijednošću availableProcesors (), kako se nalazi u Runtime.getRuntime () klasa korisnosti.

Dakle, trebali bismo koristiti a planer izračunavanja kada su zadaci u potpunosti vezani uz CPU; odnosno zahtijevaju računsku snagu i nemaju kôd za blokiranje.

Koristi neograničeni red ispred svake niti, pa ako je zadatak zakazan, ali su sve jezgre zauzete, stavit će se u red. Međutim, red prije svake niti nastavit će rasti:

Observable.just ("izračun") .subscribeOn (Schedulers.computation ()) .subscribe (i -> rezultat + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxComputationScheduler-1"));

Ako nam iz nekog razloga treba drugačiji broj niti od zadane, uvijek možemo koristiti rx.scheduler.max-computation-niti svojstvo sustava.

Uzimajući manje niti možemo osigurati da uvijek postoji jedna ili više jezgri procesora u praznom hodu, čak i pod velikim opterećenjem, računanje spremište niti ne zasićuje poslužitelj. Jednostavno nije moguće imati više niti računanja od jezgri.

9. Planeri.test

Ovaj Planer koristi se samo u svrhu testiranja i nikada ga nećemo vidjeti u proizvodnom kodu. Njegova glavna prednost je sposobnost pomicanja sata, simulirajući vrijeme koje proizvoljno prolazi:

Slova s ​​popisa = Arrays.asList ("A", "B", "C"); TestScheduler planer = Schedulers.test (); Pretplatnik na TestSubscriber = novi TestSubscriber (); Vidljivi tik = Observable .interval (1, TimeUnit.SECONDS, planer); Observable.from (slova) .zipWith (oznaka, (string, index) -> index + "-" + string) .subscribeOn (planer) .subscribe (pretplatnik); pretplatnik.assertNoValues ​​(); subscriber.assertNotCompleted (); planer.advanceTimeBy (1, TimeUnit.SECONDS); pretplatnik.assertNoErrors (); pretplatnik.assertValueCount (1); pretplatnik.assertValues ​​("0-A"); planer.advanceTimeTo (3, TimeUnit.SECONDS); pretplatnik.assertCompleted (); pretplatnik.assertNoErrors (); pretplatnik.assertValueCount (3); assertThat (subscriber.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C"));

10. Zadani planeri

Neki Uočljiv operatori u RxJavi imaju zamjenske oblike koji nam omogućuju postavljanje kojih Planer operater će koristiti za svoj rad. Drugi ne operiraju nijedno određeno Planer ili operirati na određenom zadanom Planer.

Na primjer, odgoditi operator uzima uzvodne događaje i gura ih nizvodno nakon određenog vremena. Očito je da u tom razdoblju ne može držati izvornu nit, pa mora koristiti drugu Planer:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); Raspoređivač planeraA = Planeri.from (poolA); Observable.just ('A', 'B') .delay (1, TimeUnit.SECONDS, schedulerA) .subscribe (i -> rezultat + = Thread.currentThread (). GetName () + i + ""); Tema.spavanje (2000.); Assert.assertTrue (result.equals ("Sched1-A Sched1-B"));

Bez isporuke običaja rokovnikA, svi operateri u nastavku odgoditi bi koristio planer računanja.

Ostali važni operateri koji podržavaju prilagođavanje Planeri jesu pufer, interval, domet, mjerač vremena, preskočiti, uzeti, pauzai nekoliko drugih. Ako ne pružimo a Planer takvim operaterima, računanje koristi se planer, što je u većini slučajeva siguran zadatak.

11. Zaključak

U doista reaktivnim aplikacijama, za koje su sve dugotrajne operacije asinkrone, vrlo malo niti i tako Planeri su potrebni.

Ovladavanje planerom je neophodno za pisanje skalabilnog i sigurnog koda pomoću RxJave. Razlika između pretplatiteOn i promatratiOn je posebno važno pod velikim opterećenjem, gdje se svaki zadatak mora izvršiti upravo onda kada očekujemo.

I na kraju, ali ne najmanje važno, moramo biti sigurni u to Planeri upotrebljava se nizvodno i može pratiti korak oglasa koji generira Planeri upstrea m. Za više informacija postoji članak o povratnom tlaku.

Provedbu svih ovih primjera i isječaka koda možete pronaći u projektu GitHub - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.