Uvod u Hazelcast Jet

1. Uvod

U ovom uputstvu naučit ćemo o Hazelcast Jetu. To je distribuirana obrada podataka koju pruža Hazelcast, Inc., a izgrađena je na vrhu Hazelcast IMDG-a.

Ako želite saznati više o Hazelcast IMDG-u, evo članka za početak.

2. Što je Hazelcast Jet?

Hazelcast Jet je distribuirani mehanizam za obradu podataka koji podatke tretira kao tokove. Može obrađivati ​​podatke koji su pohranjeni u bazi podataka ili datotekama, kao i podatke koje struji Kafka poslužitelj.

Štoviše, može izvršavati agregatne funkcije nad beskonačnim tokovima podataka dijeleći tokove u podskupine i primjenjujući agregiranje na svaki podskup. Ovaj je koncept u terminologiji Jet poznat kao prozor.

Jet možemo rasporediti u skup strojeva, a zatim mu predati svoje poslove obrade podataka. Jet će učiniti da svi članovi klastera automatski obrađuju podatke. Svaki član klastera troši dio podataka, što olakšava skaliranje do bilo koje razine protoka.

Evo tipičnih slučajeva upotrebe za Hazelcast Jet:

  • Obrada streama u stvarnom vremenu
  • Brza serijska obrada
  • Obrada Java 8 Streamova na distribuirani način
  • Obrada podataka u mikroservisima

3. Postavljanje

Da bismo postavili Hazelcast Jet u naše okruženje, samo trebamo dodati jednu ovisnost o Mavenu pom.xml.

Evo kako to radimo:

 com.hazelcast.jet lješnjak-mlaz 4.2 

Uključujući ovu ovisnost, preuzet će se datoteka od 10 MB jar koja nam pruža svu infrastrukturu koja nam je potrebna za izgradnju distribuiranog cjevovoda za obradu podataka.

Najnoviju verziju za Hazelcast Jet možete pronaći ovdje.

4. Primjer uzorka

Kako bismo saznali više o Hazelcast Jetu, stvorit ćemo uzorak aplikacije koja uzima unos rečenica i riječ koja se u njima nalazi i vraća broj određene riječi u tim rečenicama.

4.1. Cjevovod

Cjevovod čini osnovnu konstrukciju Jet aplikacije. Obrada unutar cjevovoda slijedi ove korake:

  • čita podatke iz izvora
  • transformirati podatke
  • zapisati podatke u sudoper

Za našu aplikaciju, cjevovod će čitati iz distribuiranog Popis, primijenite transformaciju grupiranja i agregiranja i na kraju zapišite na distribuirani Karta.

Evo kako pišemo naš cjevovod:

privatni cjevovod createPipeLine () {Cjevovod p = Cjevovod.create (); p.readFrom (Sources.list (LIST_NAME)) .flatMap (word -> traverseArray (word.toLowerCase (). split ("\ W +"))) .filter (word ->! word.isEmpty ()) .groupingKey (wholeItem ()) .aggregate (counting ()) .writeTo (Sinks.map (MAP_NAME)); povratak p; }

Nakon čitanja iz izvora, prelazimo podatke i dijelimo ih po prostoru pomoću regularnog izraza. Nakon toga filtriramo praznine.

Na kraju grupiramo riječi, agregiramo ih i zapisujemo rezultate u a Karta.

4.2. Posao

Sada kada je definiran naš cjevovod, stvaramo posao za izvršavanje cjevovoda.

Evo kako pišemo a countWord funkcija koja prihvaća parametre i vraća broj:

javni Long countWord (Popis rečenica, niz riječi) {long count = 0; JetInstance jet = Jet.newJetInstance (); isprobajte {List textList = jet.getList (LIST_NAME); textList.addAll (rečenice); Cjevovod p = createPipeLine (); jet.newJob (p) .join (); Broj brojeva karte = jet.getMap (MAP_NAME); count = counts.get (riječ); } napokon {Jet.shutdownAll (); } count count; }

Prvo kreiramo Jet instancu kako bismo stvorili svoj posao i koristili cjevovod. Zatim kopiramo ulaz Popis na distribuirani popis tako da bude dostupan u svim instancama.

Zatim predajemo posao pomoću cjevovoda koji smo izgradili gore. Metoda novi posao() vraća izvršni posao koji Jet započinje asinkrono. The pridružiti metoda čeka završetak posla i baca iznimka ako je posao dovršen s pogreškom.

Kada se posao dovrši, rezultati se dohvaćaju u distribuciji Karta, kako smo definirali u našem cjevovodu. Dakle, dobili smo Karta iz instance Jet i dohvatite brojanje riječi protiv njega.

Na kraju smo ugasili instancu Jet. Važno je ugasiti ga nakon završetka izvršenja, kao Jet instanca pokreće vlastite niti. Inače, naš Java proces i dalje će biti živ čak i nakon što naša metoda izađe.

Evo jedinstvenog testa kojim se testira kôd koji smo napisali za Jet:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord () {Popis rečenica = novi ArrayList (); rečenice.add ("Prva je sekunda bila u redu, ali druga je bila teška."); WordCounter wordCounter = novi WordCounter (); long countSecond = wordCounter.countWord (rečenice, "drugo"); assertEquals (3, countSecond); }

5. Zaključak

U ovom smo članku saznali o Hazelcast Jetu. Da biste saznali više o njemu i njegovim značajkama, pogledajte priručnik.

Kao i obično, kod za primjere korištene u ovom članku možete pronaći na Githubu.