Razlika između API-ja RxJava i API-ja Java 9 Flow

1. Uvod

Java Flow API uveden je u Javi 9 kao implementacija Specifikacije reaktivnog toka.

U ovom uputstvu prvo ćemo istražiti reaktivne tokove. Zatim ćemo saznati o njegovoj vezi s RxJava i Flow API.

2. Što su reaktivni tokovi?

Manifest reaktivne reakcije predstavio je reaktivne struje kako bi odredio standard za asinkronu obradu struje s neblokirajućim protutlakom.

Opseg Specifikacije reaktivnog toka je definirati minimalni skup sučelja za postizanje tih ciljeva:

  • org.reactivestreams.Izdavač je davatelj podataka koji pretplatnicima objavljuje podatke na temelju njihove potražnje

  • org.reactivestreams.Pretplatnik je potrošač podataka - može primati podatke nakon pretplate na izdavača

  • org.reactivestreams.Pretplata stvara se kada izdavač prihvati pretplatnika

  • org.reactivestreams.Procesor je i pretplatnik i izdavač - pretplaćuje se na izdavača, obrađuje podatke i zatim obrađene podatke prosljeđuje pretplatniku

API protoka potječe iz specifikacije. RxJava mu prethodi, ali od 2.0, RxJava podržava i specifikaciju.

Ući ćemo duboko u oboje, ali prvo, pogledajmo praktični slučaj.

3. Slučaj upotrebe

Za ovaj ćemo vodič koristiti uslugu prijenosa uživo kao svoj slučaj.

Prijenos videozapisa uživo, suprotno streamingu videozapisa na zahtjev, ne ovisi o potrošaču. Stoga poslužitelj objavljuje stream vlastitim tempom i na potrošaču je odgovornost da se prilagodi.

U najjednostavnijem obliku, naš se model sastoji od izdavača video strima i video uređaja kao pretplatnika.

Provedimo VideoFrame kao naša podatkovna stavka:

javna klasa VideoFrame {privatni dugi broj; // dodatna polja podataka // konstruktor, getteri, postavljači}

Onda idemo pojedinačno kroz naše implementacije API-ja Flow i RxJava.

4. Implementacija s API-jem protoka

API-ji protoka u JDK 9 odgovaraju Specifikaciji reaktivnih struja. S API-jem protoka, ako aplikacija u početku zatraži N stavki, izdavač pretplatniku gura najviše N stavki.

Sučelja Flow API su u java.util.concurrent.Flow sučelje. Oni su semantički ekvivalentni njihovim kolegama s reaktivnim tokovima.

Provedimo VideoStreamServer kao izdavač VideoFrame.

javna klasa VideoStreamServer proširuje SubmissionPublisher {public VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

Proširili smo svoje VideoStreamServer iz SubmissionPublisher umjesto da izravno provodi Tok :: Izdavač. SubmissionPublisher je JDK implementacija Tok :: Izdavač za asinkronu komunikaciju s pretplatnicima, tako da nam omogućuje VideoStreamServer emitirati vlastitim tempom.

Također je korisno za povratni pritisak i rukovanje odbojnikom, jer kada SubmissionPublisher :: pretplatite se zove, stvara instancu Pretplaćena pretplata, a zatim dodaje novu pretplatu u svoj lanac pretplata. Pretplaćena pretplata može spremiti izdane predmete do SubmissionPublisher # maxBufferCapacity.

Sada definirajmo VideoPlayer, koja troši struju VideoFrame. Stoga se mora provoditi Flow :: Pretplatnik.

javna klasa VideoPlayer implementira Flow.Subscriber {Flow.Subscription pretplata = null; @Preuzmi javnu prazninu na pretplati (pretplata na Flow.Subscription) {this.subscription = pretplata; pretplata.zahtjev (1); } @Override public void onNext (VideoFrame item) {log.info ("play # {}", item.getNumber ()); pretplata.zahtjev (1); } @Override public void onError (bacanje moguće je baciti) {log.error ("Pogreška je u streamingu video zapisa: {}", throwable.getMessage ()); } @Override public void onComplete () {log.error ("Video je završio"); }}

VideoPlayer pretplaćuje se na VideoStreamServer, zatim nakon uspješne pretplate VideoPlayer::onSubscribe poziva se metoda koja zahtijeva jedan okvir. VideoPlayer:: onNext prima okvir i zahtjeve za novim. Broj traženih okvira ovisi o slučaju upotrebe i Pretplatnik implementacije.

Na kraju, spojimo stvari:

VideoStreamServer streamServer = novi VideoStreamServer (); streamServer.subscribe (novi VideoPlayer ()); // podnošenje videookvira ScheduledExecutorService izvršitelj = Izvršitelji.newScheduledThreadPool (1); AtomicLong frameNumber = novi AtomicLong (); executor.scheduleWithFixedDelay (() -> {streamServer.offer (novi VideoFrame (frameNumber.getAndIncrement ()), (pretplatnik, videoFrame) -> {subscriber.onError (novi RuntimeException ("Frame #" + videoFrame.getNumber () + ispušteno zbog povratnog tlaka ")); return true;});}, 0, 1, TimeUnit.MILLISECONDS); spavanje (1000);

5. Implementacija s RxJavom

RxJava je Java implementacija ReactiveX-a. Projekt ReactiveX (ili Reactive Extensions) želi pružiti koncept reaktivnog programiranja. To je kombinacija uzorka Observer, uzorka Iterator i funkcionalnog programiranja.

Najnovija glavna verzija RxJave je 3.x. RxJava podržava reaktivni tok od verzije 2.x sa svojim Tekuće osnovna klasa, ali to je značajniji skup od reaktivnih tokova s ​​nekoliko osnovnih klasa poput Tekuće, Uočljiv, Singl, Izvršljivo.

Tekuće kao komponenta usklađenosti reaktivnog toka predstavlja protok od 0 do N predmeta s rukovanjem povratnim tlakom. Tekuće proteže se Izdavač iz reaktivnih struja. Stoga mnogi operateri RxJava prihvaćaju Izdavač izravno i dopuštaju izravnu interakciju s ostalim realizacijama reaktivnih tokova.

Sada, napravimo naš generator video tokova koji je beskrajno lijen tok:

Stream videoStream = Stream.iterate (novi VideoFrame (0), videoFrame -> {// spavanje od 1 ms; vrati novi VideoFrame (videoFrame.getNumber () + 1);});

Tada definiramo a Tekuće instanca za generiranje okvira u zasebnoj niti:

Tekući .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

Važno je napomenuti da nam je dovoljan beskonačan tok, ali ako trebamo fleksibilniji način generiranja svog toka, tada Tekuće.stvari je dobar izbor.

Tekući .create (novi FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Override public void subscribe (@NonNull FlowableEmitter emitter) {while (true) {emitter.onNext (new VideoFrame (frame.incrementAndGet ())); / sleep 1 ms za simultano kašnjenje}}}, / * Ovdje postavite strategiju povratnog tlaka * /)

Zatim se u sljedećem koraku VideoPlayer pretplaćuje na ovaj protočni i promatra stavke u zasebnoj niti.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .subscribe (item -> {log.info ("play #" + item.getNumber ()); // spavanje 30 ms da simulira prikaz okvira}) ;

I na kraju, konfigurirat ćemo strategiju za povratni tlak. Ako želimo zaustaviti video u slučaju gubitka kadra, stoga ga moramo koristiti BackpressureOverflowStrategy :: POGREŠKA kad se me uspremnik napuni.

Tekući .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .onBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.frehreserebseresereresereresereresereresereresererereserereserereserererereserereхreadreserererereserevracreadrequire4.blogspot.com) > {log.info ("play #" + item.getNumber ()); // spavanje 30 ms za simulaciju prikaza okvira});

6. Usporedba RxJava i Flow API

Čak i u ove dvije jednostavne implementacije možemo vidjeti kako je API RxJave bogat, posebno za upravljanje međuspremnikom, rukovanje pogreškama i strategiju povratnog tlaka. Pruža nam više mogućnosti i manje redaka koda sa svojim tečnim API-jem. Sada razmotrimo složenije slučajeve.

Pretpostavimo da naš uređaj ne može prikazati video okvire bez kodeka. Stoga s Flow API-jem moramo implementirati Procesor da simulira kodek i sjedne između poslužitelja i playera. S RxJavom to možemo učiniti s Tekuće :: flatMap ili Tekuće :: karta.

Ili zamislimo da će i naš uređaj emitirati audio prijevod uživo, pa moramo kombinirati video i audio zapise zasebnih izdavača. S RxJavom možemo koristiti Tekuće :: combLatest, ali s API-jem protoka to nije lak zadatak.

Iako je moguće napisati običaj Procesor koji se pretplaćuje na oba toka i šalje kombinirane podatke našem VideoPlayer. Međutim, provedba je glavobolja.

7. Zašto Flow API?

U ovom trenutku možemo imati pitanje koja je filozofija Flow API-ja?

Ako u JDK pretražujemo upotrebe API-ja protoka, možemo pronaći nešto u java.net.http i jdk.internal.net.http.

Nadalje, adaptore možemo pronaći u projektu reaktora ili paketu reaktivnog toka. Na primjer, org.reactivestreams.FlowAdapters ima metode za pretvaranje Flow API sučelja u reaktivni tok i obratno. Stoga pomaže interoperabilnost između API-ja protoka i knjižnica s podrškom za reaktivni tok.

Sve ove činjenice pomažu nam da shvatimo svrhu protoka API: stvoren je da bude skupina sučelja reaktivne specifikacije u JDK bez releja na trećim stranama. Štoviše, Java očekuje da API protoka bude prihvaćen kao standardno sučelje za reaktivnu specifikaciju i da se koristi u JDK-u ili drugim Java-baziranim knjižnicama koje implementiraju reaktivnu specifikaciju za srednji softver i uslužne programe.

8. Zaključci

U ovom uputstvu imamo uvod u Specifikacije reaktivnog toka, API protoka i RxJava.

Nadalje, vidjeli smo praktični primjer implementacije API-ja Flow i RxJava za video prijenos uživo.

Ali svi aspekti Flow API-ja i RxJave vole Protok :: Procesor, Tekuće :: karta i Tekuće :: flatMap ili ovdje nisu obrađene strategije povratnog tlaka.

Kao i uvijek, cjelovit kod tutorijala možete pronaći na GitHubu.


$config[zx-auto] not found$config[zx-overlay] not found