Uvod u RSocket

1. Uvod

U ovom uputstvu prvo ćemo pogledati RSocket i kako on omogućava komunikaciju klijent-poslužitelj.

2. Što je RSocket?

RSocket je binarni komunikacijski protokol od točke do točke namijenjen uporabi u distribuiranim aplikacijama. U tom smislu pruža alternativu drugim protokolima poput HTTP-a.

Potpuna usporedba između RSocket-a i drugih protokola izvan je dosega ovog članka. Umjesto toga, usredotočit ćemo se na ključnu značajku RSocket-a: njegove modele interakcije.

RSocket nudi četiri modela interakcije. Imajući to na umu, istražit ćemo svaku s primjerom.

3. Ovisnosti Mavena

RSocket treba samo dvije izravne ovisnosti za naše primjere:

 io.rsocket rsocket-core 0.11.13 io.rsocket rsocket-transport-netty 0.11.13 

Ovisnosti rsocket-core i rsocket-transport-netty dostupne su na Maven Central.

Važna napomena je da biblioteka RSocket često koristi reaktivne tokove. The Fluks i Mono u ovom se članku koriste predavanja pa će im biti korisno osnovno razumijevanje.

4. Postavljanje poslužitelja

Prvo, kreirajmo Poslužitelj razred:

poslužitelj javne klase {privatni konačni poslužitelj za jednokratnu upotrebu; javni poslužitelj () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (novi RSocketImpl ())). transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start () .subscribe (); } javna praznina dispose () {this.server.dispose (); } privatna klasa RSocketImpl proširuje AbstractRSocket {}}

Ovdje koristimo RSocketFactory za postavljanje i preslušavanje TCP utičnice. Prolazimo u svom običaju RSocketImpl za obradu zahtjeva klijenata. Mi ćemo dodati metode na RSocketImpl kako idemo.

Dalje, da bismo pokrenuli poslužitelj, samo ga moramo instancirati:

Poslužiteljski poslužitelj = novi poslužitelj ();

Jedna instanca poslužitelja može obraditi više veza. Kao rezultat toga, samo će jedna instanca poslužitelja podržati sve naše primjere.

Kad završimo, raspolagati metoda zaustavit će poslužitelj i otpustiti TCP priključak.

4. Modeli interakcije

4.1. Zahtjev / odgovor

RSocket pruža model zahtjeva / odgovora - svaki zahtjev dobiva jedan odgovor.

Za ovaj ćemo model stvoriti jednostavnu uslugu koja klijentu vraća poruku.

Počnimo s dodavanjem metode u naše proširenje SažetakRSocket, RSocketImpl:

@Preuzmi javni Mono requestResponse (korisni teret) {try {return Mono.just (teret); // odražava korisni teret natrag pošiljatelju} catch (Exception x) {return Mono.error (x); }}

The requestResponse metoda vraća jedan rezultat za svaki zahtjev, kao što vidimo po Mono vrsta odgovora.

Korisni teret je klasa koja sadrži sadržaj poruke i metapodatke. Koriste ga svi modeli interakcije. Sadržaj korisnog tereta je binarni, ali postoje pogodne metode koje podržavaju Nizsadržaj na bazi.

Dalje, možemo stvoriti našu klasu klijenta:

javna klasa ReqResClient {privatna konačna RSocket utičnica; javni ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } javni String callBlocking (niz niza) {return socket .requestResponse (DefaultPayload.create (string)) .map (Payload :: getDataUtf8) .block (); } javna praznina dispose () {this.socket.dispose (); }}

Klijent koristi RSocketFactory.connect () metoda za pokretanje socket veze s poslužiteljem. Koristimo requestResponse metoda na utičnici za slanje korisnog tereta na poslužitelj.

Naša nosivost sadrži Niz prešlo u klijenta. Kada Mono odgovor stiže možemo koristiti getDataUtf8 () metoda za pristup Niz sadržaj odgovora.

Konačno, možemo pokrenuti test integracije kako bismo vidjeli zahtjev / odgovor na djelu. Poslat ćemo a Niz na poslužitelj i provjerite je li isti Niz se vraća:

@Test public void whenSendingAString_thenRevceiveTheSameString () {ReqResClient client = new ReqResClient (); String string = "Hello RSocket"; assertEquals (string, client.callBlocking (niz)); client.dispose (); }

4.2. Vatra i zaboravi

S modelom vatre i zaborava, klijent neće dobiti odgovor od poslužitelja.

U ovom primjeru klijent će poslati simulirana mjerenja poslužitelju u intervalima od 50 ms. Poslužitelj će objaviti mjerenja.

Dodajmo obrađivač požara i zaborava na naš poslužitelj u RSocketImpl razred:

@Preuzmi javni Mono fireAndForget (korisni teret) {try {dataPublisher.publish (teret); // prosljeđivanje povrata korisnog tereta Mono.empty (); } catch (Iznimka x) {return Mono.error (x); }}

Ovaj rukovatelj izgleda vrlo slično obrađivaču zahtjeva / odgovora. Međutim, fireAndForget vraća se Mono umjesto Mono.

The dataPublisher je instanca org.reactivestreams.Izdavač. Dakle, čini korisni teret dostupnim pretplatnicima. To ćemo iskoristiti u primjeru zahtjeva / prijenosa.

Dalje, izradit ćemo klijenta za vatru i zaborav:

javna klasa FireNForgetClient {privatna konačna RSocket utičnica; privatni konačni podaci popisa; javni FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** Šaljite binarnu brzinu (plutajuće) svakih 50 ms * / javna praznina sendData () {data = Collections.unmodifiableList (generirajData ()); Flux.interval (Duration.ofMillis (50)) .take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

Postavljanje utičnice potpuno je isto kao i prije.

The sendData () metoda koristi a Fluks stream za slanje više poruka. Za svaku se poruku pozivamo socket :: fireAndForget.

Moramo se pretplatiti na Mono odgovor za svaku poruku. Ako se tada zaboravimo pretplatiti socket :: fireAndForget neće izvršiti.

The flatMap operator osigurava da Poništiti odgovori se prosljeđuju pretplatniku, dok blockLast operater djeluje kao pretplatnik.

Pričekat ćemo sljedeći odjeljak kako bismo pokrenuli test vatre i zaborava. U tom ćemo trenutku stvoriti klijent zahtjeva / streama za primanje podataka koje je gurnuo klijent za vatru i zaboravi.

4.3. Zahtjev / stream

U modelu zahtjev / stream, jedan zahtjev može dobiti više odgovora. Da bismo to vidjeli na djelu, možemo se nadovezati na primjer vatre i zaborava. Da bismo to učinili, zatražimo tok za dohvaćanje mjerenja koje smo poslali u prethodnom odjeljku.

Kao i prije, krenimo dodavanjem novog slušatelja na RSocketImpl na poslužitelju:

@Preuzmi javni Flux requestStream (korisni teret) {return Flux.from (dataPublisher); }

The requestStream rukovatelj vraća a Fluks potok. Kao što se prisjećamo iz prethodnog odjeljka, fireAndForget voditelj objavio dolazne podatke na dataPublisher. Sada ćemo stvoriti Fluks stream koristeći isti dataPublisher kao izvor događaja. Na taj način mjerni podaci prelazit će asinkrono od našeg klijenta za vatrogastvo i zaborav do našeg klijenta zahtjeva / streama.

Stvorimo klijenta zahtjeva / streama sljedeći:

javna klasa ReqStreamClient {privatna konačna RSocket utičnica; javni ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public Flux getDataStream () {return socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } javna praznina dispose () {this.socket.dispose (); }}

Povezujemo se s poslužiteljem na isti način kao i naši prethodni klijenti.

U getDataStream ()koristimo socket.requestStream () za primanje protoka Flux s poslužitelja. Iz tog potoka izdvajamo Plutati vrijednosti iz binarnih podataka. Konačno, tok se vraća pozivatelju, omogućujući pozivatelju da se pretplati na njega i obrađuje rezultate.

Sada testirajmo. Provjerit ćemo povratno putovanje iz vatre i zaboravi na zahtjev / stream.

Možemo tvrditi da se svaka vrijednost prima istim redoslijedom kao što je i poslana. Tada možemo ustvrditi da primamo isti broj poslanih vrijednosti:

@Test public void whenSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = novi FireNForgetClient (); ReqStreamClient streamClient = novi ReqStreamClient (); Podaci s popisa = fnfClient.getData (); Popis podatakaReceived = novi ArrayList (); Jednokratna pretplata = streamClient.getDataStream () .index () .subscribe (tuple -> {assertEquals ("Pogrešna vrijednost", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. dodaj (tuple.getT2 ());}, pogreška -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... raspolagati klijentom i pretplatom assertEquals ("Primljen pogrešan broj podataka", data.size (), dataReceived.size ()); }

4.4. Kanal

Model kanala omogućuje dvosmjernu komunikaciju. U ovom modelu tokovi poruka asinkrono teku u oba smjera.

Stvorimo jednostavnu simulaciju igre da to testiramo. U ovoj će igri svaka strana kanala postati igrač. Kako se igra bude odvijala, ovi će igrači slati poruke drugoj strani u slučajnim vremenskim intervalima. Na poruke će reagirati suprotna strana.

Prvo ćemo stvoriti obrađivač na poslužitelju. Kao i prije, dodajemo na RSocketImpl:

@Preuzmi javni Flux requestChannel (korisnički tereti izdavača) {Flux.from (korisni tereti) .subscribe (gameController :: processPayload); return Flux.from (gameController); }

The requestChannel rukovatelj ima Korisni teret tokovi i za ulaz i za izlaz. The Izdavač ulazni parametar je tok korisnih tereta primljenih od klijenta. Kad stignu, ti se tereti prenose na gameController :: processPayload funkcija.

Kao odgovor vraćamo drugačije Fluks stream natrag do klijenta. Ovaj je tok stvoren od našeg gameController, koji je također a Izdavač.

Evo sažetka GameController razred:

javna klasa GameController implementira Publisher {@Override public void pretplata (pretplatnički pretplatnik) {// slanje poruka korisnog tereta pretplatniku u slučajnim intervalima} public void processPayload (korisni teret korisničkog tereta) {// reagiranje na poruke drugog igrača}}

Kada GameController primi pretplatnika započinje slanje poruka tom pretplatniku.

Dalje, kreirajmo klijenta:

javna klasa ChannelClient {privatna konačna RSocket utičnica; privatno završno GameController gameController; javni ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = novi GameController ("Client Player"); } javna void playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } javna praznina dispose () {this.socket.dispose (); }}

Kao što smo vidjeli u našim prethodnim primjerima, klijent se povezuje s poslužiteljem na isti način kao i ostali klijenti.

Klijent kreira vlastiti primjerak GameController.

Koristimo socket.requestChannel () poslati naše Korisni teret stream na poslužitelj. Poslužitelj odgovara vlastitim streamom tereta.

Kao korisni tereti primljeni od poslužitelja, prosljeđujemo ih našem gameController :: processPayload rukovatelj.

U našoj simulaciji igre klijent i poslužitelj međusobno se zrcale. To je, svaka strana šalje tok Korisni teret i primajući tok Korisni teret s drugog kraja.

Streamovi se izvode neovisno, bez sinkronizacije.

Na kraju, pokrenimo simulaciju u testu:

@Test public void whenRunningChannelGame_thenLogTheResults () {Client ChannelClient = novi ChannelClient (); client.playGame (); client.dispose (); }

5. Zaključak

U ovom uvodnom članku istražili smo modele interakcije koje pruža RSocket. Potpuni izvorni kod primjera može se naći u našem spremištu Github.

Svakako potražite web stranicu RSocket za dublju raspravu. Konkretno, često postavljana pitanja i motivacioni dokumenti pružaju dobru pozadinu.