Uvod u Spring Cloud Stream

1. Pregled

Spring Cloud Stream je okvir izgrađen na vrhu Spring Boot i Spring Integration koji pomaže u stvaranju mikroservisa vođenih događajima ili porukama.

U ovom ćemo članku predstaviti koncepte i konstrukcije Spring Cloud Streama s nekoliko jednostavnih primjera.

2. Ovisnosti Mavena

Da bismo započeli, trebamo dodati Spring Cloud Starter Stream s ovisnošću brokera RabbitMQ Maven kao sredinu za razmjenu poruka u naš pom.xml:

 org.springframework.cloud proljeće-oblak-starter-tok-zec 1.3.0.OBLAŽENJE 

A mi ćemo dodati ovisnost o modulu iz Maven Central kako bismo omogućili i podršku za JUnit:

 org.springframework.cloud spring-cloud-stream-stream-test-support 1.3.0.Opusti test 

3. Glavni pojmovi

Arhitektura mikrousluga slijedi princip "pametnih krajnjih točaka i glupih cijevi". Komunikaciju između krajnjih točaka pokreću stranke za razmjenu poruka i posrednički softver poput RabbitMQ ili Apache Kafka. Usluge komuniciraju objavljivanjem događaja domene putem ovih krajnjih točaka ili kanala.

Prođimo kroz koncepte koji čine okvir Spring Cloud Streama, zajedno s osnovnim paradigmama kojih moramo biti svjesni kako bismo izgradili usluge vođene porukama.

3.1. Konstrukti

Pogledajmo jednostavnu uslugu u Spring Cloud Streamu koja sluša ulazni obvezujuća i šalje odgovor na izlaz obvezujući:

@SpringBootApplication @EnableBinding (Processor.class) javna klasa MyLoggerServiceApplication {public static void main (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) public LogMessage enrichLogMessage (LogMessage log) {return new LogMessage (String.format ("[1]:% s", log.getMessage ())); }}

Bilješka @EnableBinding konfigurira aplikaciju za povezivanje kanala ULAZNI i IZLAZ definirano unutar sučelja Procesor. Oba su kanala veze koje se mogu konfigurirati za upotrebu konkretnog posredničkog softvera za razmjenu poruka ili veziva.

Pogledajmo definiciju svih ovih pojmova:

  • Vezovi - zbirka sučelja koja deklarativno identificiraju ulazne i izlazne kanale
  • Vezivo - primjena posredničkog softvera za razmjenu poruka poput Kafke ili RabbitMQ-a
  • Kanal - predstavlja komunikacijsku cijev između razmjene poruka-posredničkog softvera i aplikacije
  • StreamListeners - metode rukovanja porukama u grahu koje će se automatski pozvati na poruku s kanala nakon Pretvarač poruka radi li serializaciju / deserializaciju između događaja specifičnih za middleware i vrsta objekata domene / POJO
  • Meskadulja Sheme - koriste se za serializaciju i deserializaciju poruka, ove se sheme mogu statički čitati s lokacije ili dinamički učitavati, podržavajući evoluciju vrsta objektnih domena

3.2. Obrasci komunikacije

Poruke označene na odredišta dostavlja Objavi-pretplati se uzorak poruka. Izdavači kategoriziraju poruke u teme, a svaka se identificira imenom. Pretplatnici pokazuju interes za jednu ili više tema. Međuprodukt filtrira poruke, dostavljajući pretplatnicima one zanimljive teme.

Sada bi se pretplatnici mogli grupirati. A potrošačka skupina je skup pretplatnika ili potrošača, prepoznatljiv pomoću a id grupe, unutar kojeg se poruke iz teme ili particije teme dostavljaju na uravnotežen način.

4. Model programiranja

Ovaj odjeljak opisuje osnove izrade aplikacija Spring Cloud Stream.

4.1. Ispitivanje funkcionalnosti

Podrška za test je implementacija veziva koja omogućuje interakciju s kanalima i pregled poruka.

Pošaljite poruku gore navedenim enrichLogMessage usluga i provjerite sadrži li odgovor tekst “[1]: “ na početku poruke:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = MyLoggerServiceApplication.class) @DirtiesContext javna klasa MyLoggerApplicationTests {@Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (new LogMessage ("Ovo je moja poruka")) .build ()); Korisno opterećenje objekta = messageCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: Ovo je moja poruka", payload.toString ()); }}

4.2. Prilagođeni kanali

U gornjem primjeru koristili smo Procesor sučelje koje pruža Spring Cloud, koji ima samo jedan ulazni i jedan izlazni kanal.

Ako trebamo nešto drugačije, poput jednog ulaznog i dva izlazna kanala, možemo stvoriti prilagođeni procesor:

javno sučelje MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @Output MessageChannel anotherOutput (); }

Proljeće će nam pružiti pravilnu implementaciju ovog sučelja. Imena kanala mogu se postaviti pomoću napomena kao u @Output ("myOutput").

Inače će Spring koristiti nazive metoda kao nazive kanala. Stoga smo pozvali tri kanala myInput, myOutput, i drugiIzlaz.

Sada, zamislimo da želimo preusmjeriti poruke na jedan izlaz ako je vrijednost manja od 10, a u drugi izlaz ako je vrijednost veća ili jednaka 10:

@Autowired privatni procesor MyProcessor; @StreamListener (MyProcessor.INPUT) javna void routeValues ​​(Integer val) {if (val <10) {processor.anOutput (). Send (message (val)); } else {procesor.anotherOutput (). send (message (val)); }} privatna statička konačna poruka poruke (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. Uvjetno otpremanje

Koristiti @StreamListener napomena, također možemo filtrirati poruke koje očekujemo od potrošača koristeći bilo koji uvjet koji definiramo SpEL izrazima.

Kao primjer, mogli bismo koristiti uvjetnu otpremu kao drugi pristup za usmjeravanje poruka na različite izlaze:

@Autowired privatni procesor MyProcessor; @StreamListener (target = MyProcessor.INPUT, condition = "payload = 10") javna praznina routeValuesToAnotherOutput (Integer val) {processor.anotherOutput (). Send (message (val)); }

Jedini ograničenje ovog pristupa je da ove metode ne smiju vraćati vrijednost.

5. Postavljanje

Postavimo aplikaciju koja će obrađivati ​​poruku posrednika RabbitMQ.

5.1. Konfiguracija veziva

Možemo konfigurirati našu aplikaciju da koristi zadanu implementaciju veziva putem META-INF / opruga.veziva:

zec: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Ili možemo dodati knjižnicu veziva za RabbitMQ na put predavanja tako što ćemo uključiti ovu ovisnost:

 org.springframework.cloud proljeće-oblak-tok-vezivo-zec 1.3.0.OBLAŽENJE 

Ako nije osigurana implementacija veziva, Spring će koristiti izravnu komunikaciju porukom između kanala.

5.2. RabbitMQ konfiguracija

Da bismo konfigurirali primjer u odjeljku 3.1 za upotrebu RabbitMQ veziva, moramo ažurirati primjena.iml smješteno na src / glavni / resursi:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: port : 5672 korisničko ime: lozinka: virtual-host: /

The ulazni obvezujuća će koristiti razmjenu tzv red.log.poruke, i izlaz obvezujući će koristiti razmjenu queue.pretty.log.messages. Oba će se veza koristiti nazvanim vezivom lokalni_zajec.

Imajte na umu da ne moramo unaprijed kreirati RabbitMQ razmjene ili redove čekanja. Prilikom pokretanja aplikacije, obje se razmjene automatski kreiraju.

Da bismo testirali aplikaciju, možemo koristiti web mjesto za upravljanje RabbitMQ za objavljivanje poruke. U Objavi poruku panel razmjene red.log.poruke, moramo unijeti zahtjev u JSON formatu.

5.3. Prilagođavanje pretvorbe poruka

Spring Cloud Stream omogućuje nam primjenu pretvorbe poruka za određene vrste sadržaja. U gornjem primjeru, umjesto da koristimo JSON format, želimo pružiti običan tekst.

Da bismo to učinili, mi ćemo primijeniti prilagođenu transformaciju na LogMessage pomoću a Pretvarač poruka:

@SpringBootApplication @EnableBinding (Processor.class) javna klasa MyLoggerServiceApplication {// ... @Bean public MessageConverter providesTextPlainMessageConverter () {return new TextPlainMessageConverter (); } // ...}
javna klasa TextPlainMessageConverter proširuje AbstractMessageConverter {public TextPlainMessageConverter () {super (novi MimeType ("tekst", "običan")); } @Override zaštićene logičke potpore (Class clazz) {return (LogMessage.class == clazz); } @Override zaštićeni Object convertFromInternal (poruka poruke, Class targetClass, Object conversionHint) {Object loadload = message.getPayload (); Tekst niza = instanca korisnog tereta niza? (String) korisni teret: novi String ((byte []) nosivost); vrati novu LogMessage (tekst); }}

Nakon primjene ovih promjena, vraćanje na Objavi poruku ploči, ako postavimo zaglavlje "vrste sadržaja"Do"tekst / običan"I korisni teret do"Pozdrav svijete“, Trebalo bi raditi kao i prije.

5.4. Potrošačke skupine

Kada pokrećete više instanci naše aplikacije, svaki put kad se na ulaznom kanalu pojavi nova poruka, svi pretplatnici bit će obaviješteni.

Većinu vremena trebamo da se poruka obradi samo jednom. Spring Cloud Stream implementira ovo ponašanje putem grupa potrošača.

Da bi se omogućilo ovo ponašanje, svaki potrošački obvezujući može koristiti proljeće. oblak.tok.vezivi..grupa svojstvo za specificiranje naziva grupe:

proljeće: oblak: tok: vezovi: ulaz: odredište: red čekanja.log.poruke vezivo: lokalna_zajčeva grupa: logMessageConsumers ...

6. Mikroservisi vođeni porukama

U ovom odjeljku uvodimo sve potrebne značajke za pokretanje naših aplikacija Spring Cloud Stream u kontekstu mikro usluga.

6.1. Povećavanje

Kada je pokrenuto više aplikacija, važno je osigurati da podaci budu pravilno podijeljeni među potrošačima. Da bi to učinio, Spring Cloud Stream nudi dva svojstva:

  • proljeće. oblak.tok.instanceCount - broj pokrenutih aplikacija
  • proljeće. oblak.tok.instanceIndex - indeks trenutne prijave

Na primjer, ako smo primijenili dva primjerka gore navedenog MyLoggerServiceApplication prijava, svojstvo proljeće. oblak.tok.instanceCount treba biti 2 za obje aplikacije i za svojstvo proljeće. oblak.tok.instanceIndex treba biti 0 odnosno 1.

Ta se svojstva automatski postavljaju ako aplikacije Spring Cloud Stream implementiramo pomoću Spring Data Flow kako je opisano u ovom članku.

6.2. Pregrađivanje

Događaji domene mogu biti Pregrađeno poruke. Ovo pomaže kad jesmo povećavanje pohrane i poboljšanje performansi aplikacije.

Događaj domene obično ima particijski ključ tako da završava na istoj particiji s povezanim porukama.

Recimo da želimo da se zapisničke poruke dijele prema prvom slovu u poruci, što bi bio particijski ključ, i grupiraju u dvije particije.

Postojala bi jedna particija za poruke dnevnika koje počinju s A-M i drugu particiju za N-Z. To se može konfigurirati pomoću dva svojstva:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - izraz za razdvajanje korisnih tereta
  • spring.cloud.stream.bindings.output.producer.partitionCount - broj grupa

Ponekad je izraz za particiju previše složen da bi ga napisao u samo jednom retku. U tim slučajevima možemo napisati našu prilagođenu strategiju particije koristeći svojstvo spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Pokazatelj zdravlja

U kontekstu mikro usluga također trebamo prepoznati kada usluga prestaje raditi ili počinje propadati. Spring Cloud Stream pruža imanje upravljanje.zdravstvo.veza.omogućeno kako bi se omogućili zdravstveni pokazatelji veziva.

Prilikom pokretanja aplikacije možemo postaviti upit o zdravstvenom stanju na //:/zdravlje.

7. Zaključak

U ovom uputstvu predstavili smo glavne koncepte Spring Cloud Streama i pokazali kako ga koristiti kroz nekoliko jednostavnih primjera preko RabbitMQ-a. Više informacija o Spring Cloud Stream-u možete pronaći ovdje.

Izvorni kod za ovaj članak može se naći na GitHubu.