Uvod u Netty

1. Uvod

U ovom ćemo članku pogledati Netty - asinhroni mrežni aplikacijski okvir vođen događajima.

Glavna svrha Netty-a je izgradnja protokolarnih poslužitelja visokih performansi temeljenih na NIO (ili možda NIO.2) s razdvajanjem i labavim spajanjem komponenata mreže i poslovne logike. Može implementirati nadaleko poznati protokol, kao što je HTTP, ili vaš vlastiti protokol.

2. Temeljni koncepti

Netty je neblokirajući okvir. To dovodi do velike propusnosti u usporedbi s blokiranjem IO-a. Razumijevanje neblokirajućeg IO-a presudno je za razumijevanje Nettyjevih glavnih komponenata i njihovih odnosa.

2.1. Kanal

Kanal je osnova Java NIO. Predstavlja otvorenu vezu koja je sposobna za IO operacije poput čitanja i pisanja.

2.2. Budućnost

Svaka IO operacija na a Kanal u Nettyju ne blokira.

To znači da se svaka operacija vraća odmah nakon poziva. Tamo je Budućnost sučelje u standardnoj Java knjižnici, ali nije prikladno za Nettyjeve svrhe - možemo samo pitati Budućnost o završetku operacije ili o blokiranju trenutne niti dok se operacija ne završi.

Zato Netty ima svoje ChannelFuture sučelje. Možemo proslijediti povratni poziv na ChannelFuture koji će se pozvati po završetku operacije.

2.3. Događaji i voditelji

Netty koristi paradigmu aplikacija vođenu događajima, pa je cjevovod obrade podataka lanac događaja koji prolazi kroz rukovatelje. Događaji i obrađivači mogu se povezati s ulaznim i izlaznim protokom podataka. Ulazni događaji mogu biti sljedeći:

  • Aktiviranje i deaktiviranje kanala
  • Pročitajte operativne događaje
  • Iznimke
  • Korisnički događaji

Odlazni su događaji jednostavniji i uglavnom su povezani s otvaranjem / zatvaranjem veze i upisivanjem / ispiranjem podataka.

Netty aplikacije sastoje se od nekoliko događaja umrežavanja i logike aplikacija i njihovih rukovatelja. Osnovna sučelja za obrađivače događaja kanala su ChannelHandler i njegovih predaka ChannelOutboundHandler i ChannelInboundHandler.

Netty pruža veliku hijerarhiju implementacija ChannelHandler. Vrijedno je napomenuti adaptere koji su samo prazne implementacije, na pr. ChannelInboundHandlerAdapter i ChannelOutboundHandlerAdapter. Te bismo adaptere mogli proširiti kada trebamo obraditi samo podskup svih događaja.

Također, postoje mnoge implementacije specifičnih protokola kao što je HTTP, na pr. HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. Bilo bi dobro upoznati se s njima u Nettyjevom Javadocu.

2.4. Koderi i dekoderi

Dok radimo s mrežnim protokolom, moramo izvršiti serializaciju i deserializaciju podataka. U tu svrhu, Netty uvodi posebna proširenja ChannelInboundHandler za dekoderi koji su sposobni za dekodiranje dolaznih podataka. Osnovna klasa većine dekodera je ByteToMessageDecoder.

Za kodiranje odlaznih podataka, Netty ima proširenja ChannelOutboundHandler pozvao enkoderi. MessageToByteEncoder je osnova za većinu implementacija kodera. Poruku iz sekvence bajtova možemo pretvoriti u Java objekt i obratno enkoderima i dekoderima.

3. Primjer poslužiteljske aplikacije

Stvorimo projekt koji predstavlja jednostavan poslužitelj protokola koji prima zahtjev, vrši izračun i šalje odgovor.

3.1. Ovisnosti

Prije svega, moramo osigurati ovisnost o Nettyju u našem pom.xml:

 io.netty netty-all 4.1.10.Final 

Najnoviju verziju možemo pronaći na Maven Central.

3.2. Model podataka

Klasa podataka zahtjeva imala bi sljedeću strukturu:

javna klasa RequestData {private int intValue; private String stringValue; // standardni geteri i postavljači}

Pretpostavimo da poslužitelj prima zahtjev i vraća intValue pomnoženo s 2. Odgovor bi imao jednu vrijednost int:

javna klasa ResponseData {private int intValue; // standardni geteri i postavljači}

3.3. Zatraži dekoder

Sada moramo stvoriti kodere i dekodere za naše protokolarne poruke.

Treba napomenuti da Netty radi s međuspremnikom za primanje soketa, koji nije predstavljen kao red već samo kao gomila bajtova. To znači da se naš ulazni rukovatelj može pozvati kada poslužitelj ne primi cjelovitu poruku.

Prije obrade moramo biti sigurni da smo primili cijelu poruku a postoji mnogo načina za to.

Prije svega, možemo stvoriti privremeni ByteBuf i dodajte mu sve ulazne bajtove dok ne dobijemo potrebnu količinu bajtova:

javna klasa SimpleProcessingHandler proširuje ChannelInboundHandlerAdapter {private ByteBuf tmp; @Override public void handlerAdded (ChannelHandlerContext ctx) {System.out.println ("Handler added"); tmp = ctx.alloc (). međuspremnik (4); } @Override javni void handlerUklonjen (ChannelHandlerContext ctx) {System.out.println ("Rukovatelj uklonjen"); tmp.release (); tmp = nula; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg; tmp.writeBytes (m); m.release (); if (tmp.readableBytes ()> = 4) {// obrada zahtjeva RequestData requestData = nova RequestData (); requestData.setIntValue (tmp.readInt ()); ResponseData responseData = novi ResponseData (); responseData.setIntValue (requestData.getIntValue () * 2); ChannelFuture future = ctx.writeAndFlush (responseData); future.addListener (ChannelFutureListener.CLOSE); }}}

Gornji primjer izgleda pomalo čudno, ali pomaže nam da shvatimo kako Netty radi. Svaka metoda našeg obrađivača poziva se kada se dogodi njezin odgovarajući događaj. Stoga inicijaliziramo međuspremnik kad se doda rukovatelj, ispunjavamo ga podacima o primanju novih bajtova i započinjemo s obradom kad dobijemo dovoljno podataka.

Namjerno nismo koristili stringValue - dekodiranje na takav način bilo bi nepotrebno složeno. Zbog toga Netty nudi korisne klase dekodera koje su implementacije ChannelInboundHandler: ByteToMessageDecoder i ReplayingDecoder.

Kao što smo gore napomenuli, s Nettyem možemo stvoriti cjevovod za obradu kanala. Tako svoj dekoder možemo staviti kao prvi rukovatelj, a obrađivač logike obrade može doći nakon njega.

Dekoder za RequestData prikazan je sljedeći:

javna klasa RequestDecoder proširuje ReplayingDecoder {private final Charset charset = Charset.forName ("UTF-8"); @Override zaštićeno void dekodiranje (ChannelHandlerContext ctx, ByteBuf in, List out) baca izuzetak {RequestData data = new RequestData (); data.setIntValue (in.readInt ()); int strLen = in.readInt (); data.setStringValue (in.readCharSequence (strLen, charset) .toString ()); out.add (podaci); }}

Ideja ovog dekodera prilično je jednostavna. Koristi implementaciju ByteBuf što baca iznimku kada u međuspremniku nema dovoljno podataka za operaciju čitanja.

Kad se uhvati iznimka, međuspremnik se premotava na početak i dekoder čeka novi dio podataka. Dekodiranje se zaustavlja kad se van popis nije prazan nakon dekodirati izvršenje.

3.4. Kodiranje odgovora

Osim dekodiranja RequestData moramo šifrirati poruku. Ova je operacija jednostavnija jer imamo pune podatke o poruci kada se dogodi operacija pisanja.

Možemo upisivati ​​podatke u Kanal u našem glavnom handleru ili možemo odvojiti logiku i stvoriti handler koji se proteže MessageToByteEncoder koji će uhvatiti zapis ResponseData operacija:

javna klasa ResponseDataEncoder proširuje MessageToByteEncoder {@Override zaštićeno void kodiranje (ChannelHandlerContext ctx, ResponseData msg, ByteBuf out) baca Exception {out.writeInt (msg.getIntValue ()); }}

3.5. Obrada zahtjeva

Budući da smo dekodiranje i kodiranje izveli u odvojenim rukovateljima, moramo promijeniti svoj ProcessingHandler:

javna klasa ProcessingHandler proširuje ChannelInboundHandlerAdapter {@Override public void channelRead (ChannelHandlerContext ctx, Object msg) baca izuzetak {RequestData requestData = (RequestData) msg; ResponseData responseData = novi ResponseData (); responseData.setIntValue (requestData.getIntValue () * 2); ChannelFuture future = ctx.writeAndFlush (responseData); future.addListener (ChannelFutureListener.CLOSE); System.out.println (requestData); }}

3.6. Bootstrap poslužitelja

A sada spojimo sve i pokrenimo naš poslužitelj:

javna klasa NettyServer {private int port; // konstruktor public static void main (String [] args) baca iznimku {int port = args.length> 0? Integer.parseInt (args [0]); : 8080; novi NettyServer (port) .run (); } public void run () baca izuzetak {EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup workerGroup = novi NioEventLoopGroup (); isprobajte {ServerBootstrap b = novi ServerBootstrap (); b.group (bossGroup, workerGroup) .channel (NioServerSocketChannel.class) .childHandler (novi ChannelInitializer () {@Override public void initChannel (SocketChannel ch) baca izuzetak {ch.pipeline (). addLast (new RequestDetancoder (New RequestDetancoder) (), novi ProcessingHandler ());}}). opcija (ChannelOption.SO_BACKLOG, 128) .childOption (ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind (port) .sync (); f.channel (). closeFuture (). sync (); } napokon {workerGroup.shutdownGracefully (); bossGroup.shutdownGracefully (); }}}

Pojedinosti klasa korištenih u gornjem primjeru poslužitelja za pokretanje poslužitelja mogu se naći u njihovom Javadocu. Najzanimljiviji dio je ovaj redak:

ch.pipeline (). addLast (novi RequestDecoder (), novi ResponseDataEncoder (), novi ProcessingHandler ());

Ovdje definiramo ulazne i izlazne rukovatelje koji će obrađivati ​​zahtjeve i izlaz u ispravnom redoslijedu.

4. Prijava klijenta

Klijent bi trebao izvršiti obrnuto kodiranje i dekodiranje, pa moramo imati RequestDataEncoder i ResponseDataDecoder:

javna klasa RequestDataEncoder proširuje MessageToByteEncoder {private final Charset charset = Charset.forName ("UTF-8"); @Override zaštićeno void kodiranje (ChannelHandlerContext ctx, RequestData msg, ByteBuf out) baca iznimku {out.writeInt (msg.getIntValue ()); out.writeInt (msg.getStringValue (). length ()); out.writeCharSequence (msg.getStringValue (), charset); }}
javna klasa ResponseDataDecoder proširuje ReplayingDecoder {@Override zaštićeno void dekodiranje (ChannelHandlerContext ctx, ByteBuf in, List out) baca izuzetak {ResponseData data = new ResponseData (); data.setIntValue (in.readInt ()); out.add (podaci); }}

Također, moramo definirati a ClientHandler koji će poslati zahtjev i primiti odgovor od poslužitelja:

javna klasa ClientHandler proširuje ChannelInboundHandlerAdapter {@Override public void channelActive (ChannelHandlerContext ctx) baca izuzetak {RequestData msg = new RequestData (); msg.setIntValue (123); msg.setStringValue ("sve što radi i bez igre Jack čini dosadnim dječakom"); ChannelFuture future = ctx.writeAndFlush (msg); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) baca izuzetak {System.out.println ((ResponseData) msg); ctx.close (); }}

Sada pokrenimo klijenta:

javna klasa NettyClient {public static void main (String [] args) baca iznimku {String host = "localhost"; int port = 8080; EventLoopGroup workerGroup = novi NioEventLoopGroup (); pokušajte {Bootstrap b = novi Bootstrap (); b.group (workerGroup); b.kanal (NioSocketChannel.class); b.option (ChannelOption.SO_KEEPALIVE, true); b.handler (novi ChannelInitializer () {@Override public void initChannel (SocketChannel ch) baca iznimku {ch.pipeline (). addLast (new RequestDataEncoder (), new ResponseDataDecoder (), new ClientHandler ());}}); ChannelFuture f = b.connect (host, port) .sync (); f.channel (). closeFuture (). sync (); } napokon {workerGroup.shutdownGracefully (); }}}

Kao što možemo vidjeti, mnogo je zajedničkih detalja s posluživanjem sustava za pokretanje sustava.

Sada možemo pokrenuti glavnu metodu klijenta i pogledati izlaz konzole. Kao što smo i očekivali, dobili smo ResponseData s intValue jednak 246.

5. Zaključak

U ovom smo članku kratko upoznali Netty. Pokazali smo njegove ključne komponente kao što su Kanal i ChannelHandler. Također, napravili smo jednostavan poslužitelj protokola koji ne blokira i klijent za njega.

Kao i uvijek, svi uzorci koda dostupni su na GitHub-u.