Početak rada s obradom streama s Spring Cloud protokom podataka

1. Uvod

Proljetni protok podataka u oblaku je izvorni program za programiranje i rad u oblaku za mikropostave podataka koji se mogu sastaviti.

S Proljetni protok podataka u oblaku, programeri mogu stvoriti i orkestrirati cjevovode podataka za uobičajene slučajeve poput unosa podataka, analitike u stvarnom vremenu i uvoza / izvoza podataka.

Ovi cjevovodi podataka dolaze u dva okusa, cjevovodi streaminga i batch podataka.

U prvom slučaju, neograničena količina podataka troši se ili proizvodi putem posredničkog softvera za razmjenu poruka. Dok u drugom slučaju kratkotrajni zadatak obrađuje konačan skup podataka, a zatim završava.

Ovaj će se članak usredotočiti na streaming obradu.

2. Arhitektonski pregled

Ključne komponente ove vrste arhitekture su Prijave, Poslužitelj protoka podatakai ciljano vrijeme izvođenja.

Uz ove ključne komponente, obično imamo i Ljuska protoka podataka i a posrednik poruka unutar arhitekture.

Pogledajmo sve ove komponente detaljnije.

2.1. Prijave

Tipično cjevovod streaming podataka uključuje konzumiranje događaja iz vanjskih sustava, obradu podataka i postojanost poliglota. Te se faze obično nazivaju Izvor, Procesor, i Umivaonik u Proljetni oblak terminologija:

  • Izvor: je aplikacija koja troši događaje
  • Procesor: troši podatke iz Izvor, izvrši neku obradu na njemu i emitira obrađene podatke u sljedeću aplikaciju u cjevovodu
  • Umivaonik: bilo troši iz a Izvor ili Procesor i zapisuje podatke u željeni sloj postojanosti

Te se aplikacije mogu pakirati na dva načina:

  • Spring Boot uber-jar koji je hostiran u maven spremištu, datoteci, http ili bilo kojoj drugoj implementaciji Spring izvora (ova metoda će se koristiti u ovom članku)
  • Lučki radnik

Mnogi su izvori, procesori i sudoperi za uobičajene slučajeve upotrebe (npr. Jdbc, hdfs, http, usmjerivač) već ponuđeni i spremni za upotrebu od strane Proljetni protok podataka u oblaku tim.

2.2. Vrijeme izvođenja

Također, potrebno je vrijeme izvođenja da bi se te aplikacije mogle izvršiti. Podržana vremena izvođenja su:

  • Ljevaonica oblaka
  • Apache pređa
  • Kubernetes
  • Apache Mesos
  • Lokalni poslužitelj za razvoj (koji će se koristiti u ovom članku)

2.3. Poslužitelj protoka podataka

Komponenta koja je odgovorna za postavljanje aplikacija u vrijeme izvođenja je Poslužitelj protoka podataka. Tamo je Poslužitelj protoka podataka izvršna tegla predviđena za svako ciljano vrijeme izvođenja.

The Poslužitelj protoka podataka odgovoran je za tumačenje:

  • Stream DSL koji opisuje logički protok podataka kroz više aplikacija.
  • Manifest implementacije koji opisuje preslikavanje aplikacija na vrijeme izvođenja.

2.4. Ljuska protoka podataka

Ljuska protoka podataka klijent je poslužitelja protoka podataka. Ljuska nam omogućuje izvođenje DSL naredbe potrebne za interakciju s poslužiteljem.

Kao primjer, DSL za opis protoka podataka od http izvora do jdbc sudopera napisao bi se kao „http | jdbc ”. Ta su imena u DSL-u registrirana kod Poslužitelj protoka podataka i preslikati na artefakte aplikacija koji se mogu hostirati u spremištima Maven ili Docker.

Proljeće također nudi grafičko sučelje, nazvano Flo, za stvaranje i praćenje cjevovoda streaming podataka. Međutim, njegova uporaba izvan je rasprave u ovom članku.

2.5. Broker poruka

Kao što smo vidjeli u primjeru prethodnog odjeljka, upotrijebili smo simbol cijevi u definiciji protoka podataka. Simbol cijevi predstavlja komunikaciju između dviju aplikacija putem posredničkog softvera za razmjenu poruka.

To znači da nam je potreban posrednik poruka i pokrenut u ciljanom okruženju.

Dva posrednika posrednika za razmjenu poruka koji su podržani su:

  • Apache Kafka
  • RabbitMQ

I tako, sada kada imamo pregled arhitektonskih komponenata - vrijeme je za izgradnju našeg prvog cjevovoda za obradu toka.

3. Instalirajte Broker poruka

Kao što smo vidjeli, aplikacije u pripremi trebaju međusobni softver za razmjenu poruka da bi komunicirali. U svrhu ovog članka nastavit ćemo s RabbitMQ.

Za sve detalje instalacije možete slijediti upute na službenim stranicama.

4. Lokalni poslužitelj protoka podataka

Da bismo ubrzali proces generiranja naših aplikacija, koristit ćemo Spring Initializr; uz njegovu pomoć možemo dobiti svoje Proljetni čizme aplikacije za nekoliko minuta.

Nakon što dođete na web mjesto, jednostavno odaberite a Skupina i an Artefakt Ime.

Kada je to gotovo, kliknite gumb Generiraj projekt za početak preuzimanja artefakta Maven.

Nakon završetka preuzimanja, raspakirajte projekt i uvezite ga kao Maven projekt u svoj IDE po izboru.

Dodajte projektu ovisnost Mavena. Kako trebamo Lokalni poslužitelj protoka podataka knjižnice, dodajmo proljetni-oblak-starter-protok podataka-poslužitelj-lokalna ovisnost:

 org.springframework.cloud proljeće-oblak-starter-protok podataka-poslužitelj-lokalno 

Sada moramo označiti Proljetni čizme glavni razred sa @EnableDataFlowServer napomena:

@EnableDataFlowServer @SpringBootApplication javna klasa SpringDataFlowServerApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

To je sve. Naše Lokalni poslužitelj protoka podataka je spreman za izvršenje:

mvn spring-boot: trčanje

Program će se pokrenuti na priključku 9393.

5. Ljuska protoka podataka

Opet idite na Spring Initializr i odaberite a Skupina i Artefakt Ime.

Nakon što smo preuzeli i uvezli projekt, dodajmo ovisnost opruge spring-cloud-dataflow-shell:

 org.springframework.cloud proljeće-oblak-protok podataka-ljuska 

Sada moramo dodati @EnableDataFlowShell bilješka na Proljetni čizme glavna klasa:

@EnableDataFlowShell @SpringBootApplication javna klasa SpringDataFlowShellApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

Sada možemo pokrenuti ljusku:

mvn spring-boot: trčanje

Nakon što se ljuska pokrene, možemo upisati Pomozite naredbu u retku da biste vidjeli cjelovit popis naredbi koje možemo izvršiti.

6. Izvorna aplikacija

Slično tome, na Initializr ćemo sada stvoriti jednostavnu aplikaciju i dodati a Stream Zec ovisnost nazvana proljeće-oblak-starter-tok-zec:

 org.springframework.cloud proljeće-oblak-starter-potok-zec 

Zatim ćemo dodati @EnableBinding (Source.class) bilješka na Proljetni čizme glavna klasa:

@EnableBinding (Source.class) @SpringBootApplication javna klasa SpringDataFlowTimeSourceApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

Sada moramo definirati izvor podataka koji se moraju obraditi. Ovaj izvor može biti bilo koje potencijalno beskrajno radno opterećenje (podaci senzora interneta, obrada događaja 24/7, unos podataka o mrežnim transakcijama).

U našoj aplikaciji uzorka proizvodimo jedan događaj (radi jednostavnosti novu vremensku oznaku) svakih 10 sekundi s a Poller.

The @InboundChannelAdapter napomena šalje poruku izlaznom izlaznom kanalu, koristeći povratnu vrijednost kao korisni teret poruke:

@Bean @InboundChannelAdapter (value = Source.OUTPUT, poller = @Poller (fixedDelay = "10000", maxMessagesPerPoll = "1")) public MessageSource timeMessageSource () {return () -> MessageBuilder.withPayload (novi datum (). GetTime ()).izgraditi(); } 

Naš je izvor podataka spreman.

7. Procesorska aplikacija

Dalje - stvorit ćemo aplikaciju i dodati a Stream Zec ovisnost.

Zatim ćemo dodati @EnableBinding (Processor.class) bilješka na Proljetni čizme glavna klasa:

@EnableBinding (Processor.class) @SpringBootApplication javna klasa SpringDataFlowTimeProcessorApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

Dalje, moramo definirati metodu za obradu podataka koji dolaze iz izvorne aplikacije.

Da bismo definirali transformator, ovu metodu moramo označiti s @Transformator napomena:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) javna transformacija objekta (duga vremenska oznaka) {DateFormat dateFormat = new SimpleDateFormat ("yyyy / MM / dd hh: mm: yy"); Niz niza = dateFormat.format (vremenska oznaka); Datum povratka; }

Pretvara vremensku oznaku iz "ulaznog" kanala u formatirani datum koji će biti poslan na "izlazni" kanal.

8. Primjena umivaonika

Posljednja aplikacija koju je izradio je aplikacija Sink.

Opet idite na Spring Initializr i odaberite a Skupina, an Artefakt Ime. Nakon preuzimanja projekta dodajte a Stream Zec ovisnost.

Zatim dodajte @EnableBinding (Sink.class) bilješka na Proljetni čizme glavna klasa:

@EnableBinding (Sink.class) @SpringBootApplication javna klasa SpringDataFlowLoggingSinkApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

Sada nam je potrebna metoda za presretanje poruka koje dolaze iz aplikacije procesora.

Da bismo to učinili, moramo dodati @StreamListener (Sink.INPUT) napomena za našu metodu:

@StreamListener (Sink.INPUT) public void loggerSink (Datum niza) {logger.info ("Primljeno:" + datum); }

Metoda jednostavno ispisuje vremensku oznaku transformiranu u formatiranom datumu u datoteku dnevnika.

9. Registrirajte aplikaciju Stream

Proljetna školjka protoka podataka Cloud Cloud omogućuje nam registraciju aplikacije Stream u registru aplikacija pomoću registar aplikacija naredba.

Moramo navesti jedinstveni naziv, vrstu aplikacije i URI koji se mogu razriješiti u artefaktu aplikacije. Za vrstu navedite “izvor“, “procesor", ili "umivaonik“.

Kada pružate URI sa shemom maven, format bi trebao odgovarati sljedećem:

maven: //: [: [:]]:

Za registraciju Izvor, Procesor i Umivaonik prethodno kreirane aplikacije, idite na Proljetna školjka protoka podataka u oblaku i iz naredbe izdajte sljedeće naredbe:

registar aplikacija --ime izvor vremena - vrsta izvor --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT registar aplikacija --ime vrijeme -procesor --tip procesor --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-processor: jar: 0.0.1-Registar aplikacija SNAPSHOT --ime logging-sink - type sink --uri maven: //com.baeldung.spring.cloud: spring-data-flow-logging-sink: jar: 0.0.1-SNAPSHOT 

10. Stvorite i postavite tok

Da biste stvorili novu definiciju toka, idite na Proljetna školjka protoka podataka u oblaku i izvršite sljedeću naredbu ljuske:

stream create --name vrijeme-do-dnevnika --definicija 'vrijeme-izvor | procesor vremena | sudoper

Ovo definira tok s imenom vrijeme za evidentiranje na temelju izraza DSL 'Izvor vremena | procesor vremena | sudoper.

Zatim za postavljanje toka izvršite sljedeću naredbu ljuske:

stream implementacija --name vrijeme za bilježenje

The Poslužitelj protoka podataka rješava izvor vremena, procesor vremena, i sudoper za sječu drva maven koordinate i koristi ih za pokretanje izvor vremena, procesor vremena i sudoper za sječu drva aplikacije toka.

Ako je stream pravilno postavljen, vidjet ćete u Poslužitelj protoka podataka zapisnici da su moduli pokrenuti i povezani:

2016-08-24 12: 29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: postavljanje aplikacije time-to-log.logging-sink instance 0 Dnevnici će biti u PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12: 29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: postavljanje aplikacije time-to-log.time-processor instance 0 Dnevnici će biti u PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log.time-processor 2016-08-24 12: 29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: postavljanje aplikacije time-to-log.time-source instance 0 Dnevnici će biti u PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. Pregled rezultata

U ovom primjeru izvor svake sekunde jednostavno šalje trenutnu vremensku oznaku kao poruku, procesor je formatira i sinkronizacija dnevnika daje formatiranu vremensku oznaku pomoću okvira za bilježenje.

Datoteke dnevnika nalaze se u direktoriju prikazanom u Poslužitelj protoka podatakaIzlaz iz dnevnika, kao što je prikazano gore. Da bismo vidjeli rezultat, možemo zabilježiti zapisnik:

tail -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12: 40: 42.029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Primljeno: 2016/08/24 11:40:01 2016-08-24 12: 40: 52.035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSinkApplication: Primljeno: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Primljeno: 2016/08 / 24 11:40:21

12. Zaključak

U ovom smo članku vidjeli kako izraditi cjevovod podataka za obradu tokova pomoću Proljetni protok podataka u oblaku.

Također, vidjeli smo ulogu Izvor, Procesor i Umivaonik aplikacije unutar toka i kako priključiti i povezati ovaj modul u a Poslužitelj protoka podataka kroz upotrebu Ljuska protoka podataka.

Primjer koda može se naći u projektu GitHub.