Pouzdano slanje poruka s JGroups

1. Pregled

JGroups je Java API za pouzdanu razmjenu poruka. Sadrži jednostavno sučelje koje pruža:

  • fleksibilni stog protokola, uključujući TCP i UDP
  • fragmentacija i ponovno sastavljanje velikih poruka
  • pouzdani unicast i multicast
  • otkrivanje kvara
  • kontrola protoka

Kao i mnoge druge značajke.

U ovom uputstvu stvorit ćemo jednostavnu aplikaciju za razmjenu Niz poruke između aplikacija i pružanje zajedničkog stanja novim aplikacijama kad se pridruže mreži.

2. Postavljanje

2.1. Ovisnost Mavena

Moramo dodati jednu ovisnost o našoj pom.xml:

 org.jgroups jgroups 4.0.10.Final 

Najnoviju verziju knjižnice možete provjeriti na Maven Central.

2.2. Umrežavanje

JGroups će pokušati koristiti IPV6 prema zadanim postavkama. Ovisno o konfiguraciji našeg sustava, to može dovesti do toga da aplikacije ne mogu komunicirati.

Da bismo to izbjegli, postavićemo java.net.preferIPv4Stack do pravi svojstvo pri pokretanju naših aplikacija ovdje:

java -Djava.net.preferIPv4Stack = true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Naša veza s mrežom JGroups je JChannel. Kanal se pridružuje klasteru i šalje i prima poruke, kao i informacije o stanju mreže.

3.1. Stvaranje kanala

Mi stvaramo JChannel s putem do konfiguracijske datoteke. Ako izostavimo naziv datoteke, tražit će udp.xml u trenutnom radnom direktoriju.

Stvorit ćemo kanal s eksplicitno imenovanom konfiguracijskom datotekom:

JChannel kanal = novi JChannel ("src / main / resources / udp.xml"); 

Konfiguracija JGroups može biti vrlo komplicirana, ali zadane UDP i TCP konfiguracije dovoljne su za većinu aplikacija. Datoteku za UDP uključili smo u naš kod i koristit ćemo je za ovaj vodič.

Za više informacija o konfiguriranju prijevoza pogledajte ovdje priručnik JGroups.

3.2. Povezivanje kanala

Nakon što stvorimo svoj kanal, moramo se pridružiti grupi. Klaster je skupina čvorova koji razmjenjuju poruke.

Za pridruživanje klasteru potrebno je ime klastera:

channel.connect ("Baeldung"); 

Prvi čvor koji se pokuša pridružiti klasteru stvorit će ga ako ne postoji. U nastavku ćemo vidjeti ovaj postupak na djelu.

3.3. Imenovanje kanala

Čvorovi se identificiraju imenom tako da vršnjaci mogu slati usmjerene poruke i primati obavijesti o tome tko ulazi i napušta klaster. JGroups će automatski dodijeliti ime ili ga možemo postaviti sami:

channel.name ("user1");

Upotrijebit ćemo ova imena u nastavku za praćenje kada čvorovi ulaze i napuštaju klaster.

3.4. Zatvaranje kanala

Čišćenje kanala neophodno je ako želimo da vršnjaci pravovremeno dobivaju obavijest da smo izašli.

Zatvaramo a JChannel sa svojom bliskom metodom:

channel.close ()

4. Promjene prikaza klastera

S kreiranim JChannelom sada smo spremni vidjeti stanje vršnjaka u klasteru i razmjenjivati ​​poruke s njima.

JGroups održava stanje klastera unutar Pogled razred. Svaki kanal ima jedan Pogled mreže. Kada se prikaz promijeni, isporučuje se putem viewAccepted () uzvratiti poziv.

Za ovaj ćemo vodič proširiti ReceiverAdaptor API klasa koja implementira sve metode sučelja potrebne za aplikaciju.

To je preporučeni način primjene povratnih poziva.

Dodajmo viewAccepted našoj aplikaciji:

public void viewAccepted (View newView) {private View lastView; if (lastView == null) {System.out.println ("Primljeni početni prikaz:"); newView.forEach (System.out :: println); } else {System.out.println ("Primljen novi prikaz."); Popis newMembers = View.newMembers (lastView, newView); System.out.println ("Novi članovi:"); newMembers.forEach (System.out :: println); Popis bivših članova = View.leftMembers (lastView, newView); System.out.println ("Izašli članovi:"); exMembers.forEach (System.out :: println); } lastView = newView; } 

Svaki Pogled sadrži a Popis od Adresa objekata koji predstavljaju svakog člana klastera. JGroups nudi praktične metode za usporedbu jednog pogleda s drugim, koje koristimo za otkrivanje novih ili izašlih članova klastera.

5. Slanje poruka

Obrada poruka u JGroupsu je jednostavna. A Poruka sadrži a bajt niz i Adresa objekti koji odgovaraju pošiljatelju i primatelju.

Za ovaj tutorial koji koristimo Žice čitati iz naredbenog retka, ali lako je vidjeti kako aplikacija može razmjenjivati ​​druge vrste podataka.

5.1. Emitirane poruke

A Poruka kreira se s odredištem i nizom bajtova; JChannel postavlja nam pošiljatelja. Ako je meta null, cijeli će klaster primiti poruku.

Prihvatit ćemo tekst iz naredbenog retka i poslati ga klasteru:

System.out.print ("Unesite poruku:"); Linija niza = in.readLine (). ToLowerCase (); Poruka poruke = nova poruka (null, line.getBytes ()); channel.send (poruka); 

Ako pokrenemo više instanci našeg programa i pošaljemo ovu poruku (nakon što implementiramo primiti () dolje), svi bi je dobili, uključujući pošiljatelja.

5.2. Blokiranje naših poruka

Ako ne želimo vidjeti svoje poruke, možemo za to postaviti svojstvo:

channel.setDiscardOwnMessages (true); 

Kada pokrenemo prethodni test, pošiljatelj poruke ne prima svoju emitiranu poruku.

5.3. Direktna poruka

Za slanje izravne poruke potrebna je valjanost Adresa. Ako se nazivamo čvorovima po imenu, trebamo način da potražimo Adresa. Srećom, imamo Pogled za to.

Struja Pogled je uvijek dostupan od JChannel:

privatno Neobavezno getAddress (naziv niza) {View view = channel.view (); vratiti prikaz.getMembers (). stream () .filter (adresa -> ime.equals (address.toString ())) .findAny (); } 

Adresa imena su dostupna putem predavanja toString () metodu, pa samo pretražujemo Popis članova klastera za ime koje želimo.

Tako možemo prihvatiti ime s konzole, pronaći povezano odredište i poslati izravnu poruku:

Adresa odredišta = null; System.out.print ("Unesite odredište:"); Niz ime odredišta = in.readLine (). ToLowerCase (); odredište = getAddress (odredišteName) .iliElseThrow (() -> nova iznimka ("Odredište nije pronađeno"); Poruka poruke = nova Poruka (odredište, "Zdravo!"); channel.send (poruka); 

6. Primanje poruka

Možemo slati poruke, ajmo sada dodati, pokušajmo ih odmah primiti.

Zamijenimo ReceiverAdaptor's prazna metoda primanja:

public void receive (Message message) {String line = Poruka primljena od: "+ message.getSrc () +" to: "+ message.getDest () +" -> "+ message.getObject (); System.out.println (crta); } 

Budući da znamo da poruka sadrži a Niz, možemo sigurno proći getObject () do System.out.

7. Državna razmjena

Kad čvor uđe u mrežu, možda će trebati dohvatiti informacije o stanju o klasteru. JGroups za to osigurava mehanizam prijenosa stanja.

Kad se čvor pridruži klasteru, on jednostavno poziva getState (). Klaster obično dohvaća stanje od najstarijeg člana u grupi - koordinatora.

Dodajte našoj aplikaciji broj emitiranih poruka. Dodati ćemo novu varijablu člana i povećavati je unutra primiti ():

private Integer messageCount = 0; public void receive (Message message) {String line = "Poruka primljena od:" + message.getSrc () + "to:" + message.getDest () + "->" + message.getObject (); System.out.println (linija); if (message.getDest () == null) {messageCount ++; System.out.println ("Broj poruka:" + Broj poruke); }} 

Provjeravamo je li null odredište, jer ako izbrojimo izravne poruke, svaki čvor imat će drugačiji broj.

Dalje, nadjačavamo još dvije metode u ReceiverAdaptor:

javna praznina setState (InputStream input) {try {messageCount = Util.objectFromStream (new DataInputStream (input)); } catch (Iznimka e) {System.out.println ("Pogreška pri deserijaliziranju stanja!"); } System.out.println (messageCount + "je trenutni broj poruka."); } javna praznina getState (izlaz OutputStream) baca iznimku {Util.objectToStream (messageCount, new DataOutputStream (output)); } 

Slično porukama, JGroups prenosi stanje kao niz bajtova.

JGroups isporučuje InputStream koordinatoru da napiše stanje i Izlazni tok za čitanje novog čvora. API pruža praktične klase za serializaciju i deserializaciju podataka.

Imajte na umu da u proizvodnom kodu pristup informacijama o stanju mora biti siguran.

Na kraju, dodajemo poziv na getState () do našeg pokretanja, nakon što se povežemo s klasterom:

channel.connect (ime klastera); channel.getState (null, 0); 

getState () prihvaća odredište s kojeg se traži stanje i vremensko ograničenje u milisekundama. A null odredište označava koordinatora, a 0 znači da ne ističe vremensko ograničenje.

Kada pokrenemo ovu aplikaciju s parom čvorova i razmijenimo emitirane poruke, vidimo porast broja poruka.

Tada, ako dodamo trećeg klijenta ili zaustavimo i pokrenemo jednog od njih, vidjet ćemo kako novopovezani čvor ispisuje ispravan broj poruka.

8. Zaključak

U ovom uputstvu koristili smo JGroups za izradu aplikacije za razmjenu poruka. Koristili smo API za praćenje čvorova koji su se povezali i napustili klaster, a također i za prijenos stanja klastera u novi čvor kada se pridružio.

Uzorke koda, kao i uvijek, možete pronaći na GitHubu.