Uvod u Apache Storm

1. Pregled

Ovaj vodič bit će uvod u Apache Storm, distribuirani računalni sustav u stvarnom vremenu.

Usredotočit ćemo se na:

  • Što je zapravo Apache Storm i koje probleme rješava
  • Njegova arhitektura i
  • Kako ga koristiti u projektu

2. Što je Apache Storm?

Apache Storm besplatan je i distribuiran sustav otvorenog koda za izračunavanja u stvarnom vremenu.

Pruža toleranciju kvarova, skalabilnost i jamči obradu podataka, a posebno je dobar u obradi neomeđenih tokova podataka.

Neki dobri slučajevi upotrebe usluge Storm mogu biti obrada operacija kreditnim karticama za otkrivanje prijevara ili obrada podataka iz pametnih domova za otkrivanje neispravnih senzora.

Storm omogućuje integraciju s raznim bazama podataka i sustavima čekanja dostupnim na tržištu.

3. Ovisnost Mavena

Prije nego što upotrijebimo Apache Storm, u naš projekt moramo uključiti ovisnost o olujnoj jezgri:

 org.apache.storm storm-core 1.2.2 pod uvjetom 

Trebali bismo koristiti samo predviđeni opseg ako namjeravamo pokrenuti našu aplikaciju na klasteru Storm.

Za lokalno pokretanje aplikacije možemo koristiti takozvani lokalni način koji će simulirati klaster Storm u lokalnom procesu, u takvom bismo slučaju trebali ukloniti pod uvjetom.

4. Model podataka

Model podataka Apache Storma sastoji se od dva elementa: korijena i potoka.

4.1. Korijen

A Korijen je poredani popis imenovanih polja s dinamičkim tipovima. To znači da ne trebamo izričito deklarirati vrste polja.

Storm mora znati kako serializirati sve vrijednosti koje se koriste u korijenu. Prema zadanim postavkama već može serializirati primitivne tipove, Žice i bajt nizovi.

A budući da Storm koristi Kryo serializaciju, moramo registrirati serializator pomoću Config za upotrebu prilagođenih vrsta. To možemo učiniti na jedan od dva načina:

Prvo, možemo registrirati klasu za serializaciju koristeći puno ime:

Config config = novi Config (); config.registerSerialization (User.class);

U takvom će slučaju Kryo serializirati klasu pomoću FieldSerializer. Prema zadanim postavkama, ovo će serializirati sva neprolazna polja klase, i privatna i javna.

Ili umjesto toga, možemo pružiti klasu za serializaciju i serializator koji želimo da Storm koristi za tu klasu:

Config config = novi Config (); config.registerSerialization (User.class, UserSerializer.class);

Da bismo stvorili prilagođeni serializator, moramo proširiti generičku klasu Serijalizator koja ima dvije metode pisati i čitati.

4.2. Stream

A Stream je srž apstrakcije u ekosustavu Oluja. The Stream je neograničeni slijed korijena.

Storms omogućuje paralelnu obradu više tokova.

Svaki tok ima id koji se pruža i dodjeljuje tijekom deklaracije.

5. Topologija

Logika aplikacije Storm u stvarnom vremenu spakirana je u topologiju. Topologija se sastoji od izljevi i vijci.

5.1. Izljev

Izvori su potoci izljevi. Oni emitiraju korijene u topologiju.

Tuple se mogu čitati iz različitih vanjskih sustava poput Kafke, Kestrela ili ActiveMQ-a.

Izljevi mogu biti pouzdan ili nepouzdan. Pouzdan znači da izljev može odgovoriti da nabor koji Storm nije uspio obraditi. Nepouzdan znači da izljev ne reagira jer će upotrijebiti mehanizam za vatru i zaborav da emitira korpe.

Da bismo stvorili prilagođeni izljev, moramo implementirati IRichSpout sučelje ili proširiti bilo koju klasu koja već implementira sučelje, na primjer, sažetak BaseRichSpout razred.

Stvorimo nepouzdan izljev:

javna klasa RandomIntSpout proširuje BaseRichSpout {private Random random; private SpoutOutputCollector outputCollector; @Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); outputCollector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); outputCollector.emit (nove vrijednosti (random.nextInt (), System.currentTimeMillis ())); } @Override javna praznina declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nova polja ("randomInt", "timestamp")); }}

Naš običaj RandomIntSpout generirat će slučajni cijeli broj i vremensku oznaku svake sekunde.

5.2. Vijak

Vijci obrađuju korijene u potoku. Mogu izvoditi razne operacije poput filtriranja, agregiranja ili prilagođenih funkcija.

Neke operacije zahtijevaju više koraka, pa ćemo stoga u takvim slučajevima trebati koristiti više vijaka.

Da biste stvorili običaj Vijak, moramo provesti IRichBolt ili za jednostavnije operacije IBasicBolt sučelje.

Na raspolaganju je i više pomoćnih klasa za implementaciju Vijak. U ovom ćemo slučaju koristiti BaseBasicBolt:

javna klasa PrintingBolt proširuje BaseBasicBolt {@Override javno void izvršavanje (Tuple tuple, BasicOutputCollector basicOutputCollector) {System.out.println (tuple); } @Override public void proglašiOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

Ovaj običaj PrintingBolt jednostavno će ispisati sve tuple na konzolu.

6. Stvaranje jednostavne topologije

Sastavimo ove ideje u jednostavnu topologiju. Naša topologija imat će jedan izljev i tri vijka.

6.1. RandomNumberSpout

U početku ćemo stvoriti nepouzdan izljev. Generirat će slučajne cijele brojeve iz raspona (0,100) svake sekunde:

javna klasa RandomNumberSpout proširuje BaseRichSpout {private Random random; privatni kolektor SpoutOutputCollector; @Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); collector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); int operacija = random.nextInt (101); duga vremenska oznaka = System.currentTimeMillis (); Vrijednosti vrijednosti = nove vrijednosti (rad, vremenska oznaka); collector.emit (vrijednosti); } @Override javna praznina proglašavaOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nova polja ("operacija", "vremenska oznaka")); }}

6.2. FilteringBolt

Dalje, stvorit ćemo zasun koji će filtrirati sve elemente pomoću operacija jednako 0:

javna klasa FilteringBolt proširuje BaseBasicBolt {@Override javno void izvršavanje (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); if (operacija> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @Override javna praznina proglašavaOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nova polja ("operacija", "vremenska oznaka")); }}

6.3. AgregirajućiBolt

Dalje, napravimo kompliciraniji Vijak koji će prikupiti sve pozitivne operacije svakog dana.

U tu svrhu koristit ćemo određenu klasu stvorenu posebno za implementaciju vijaka koji rade na prozorima umjesto da rade na pojedinačnim korpicama: BaseWindowedBolt.

Windows su bitan koncept u obradi potoka, dijeleći beskonačne tokove u konačne dijelove. Tada možemo primijeniti izračune na svaki komad. Općenito postoje dvije vrste prozora:

Vremenski prozori koriste se za grupiranje elemenata iz određenog vremenskog razdoblja pomoću vremenskih oznaka. Vremenski prozori mogu imati različit broj elemenata.

Prozori za brojanje koriste se za stvaranje prozora definirane veličine. U tom će slučaju svi prozori imati jednaku veličinu, a prozor će imati ne emitiraju se ako ima manje elemenata od definirane veličine.

Naše AgregirajućiBolt generirat će zbroj svih pozitivnih operacija iz a vremenski prozor zajedno s vremenskim oznakama početka i kraja:

javna klasa AggregatingBolt proširuje BaseWindowedBolt {private OutputCollector outputCollector; @Preuzmi javnu void pripremu (Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector; } @Override javna praznina proglašavaOutputFields (Izjavljivač OutputFieldsDeclarer) {declarer.declare (nova polja ("sumOfOperations", "beginTimestamp", "endTimestamp")); } @Override javno void izvršavanje (TupleWindow tupleWindow) {Popis tuple = tupleWindow.get (); tuples.sort (Comparator.comparing (this :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("operacija")) .sum (); Duga vremenska oznaka = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Vrijednosti vrijednosti = nove vrijednosti (sumOfOperations, startTimestamp, endTimestamp); outputCollector.emit (vrijednosti); } private Long getTimestamp (Tuple tuple) {return tuple.getLongByField ("timestamp"); }}

Imajte na umu da je u ovom slučaju izravno dobivanje prvog elementa popisa sigurno. To je zato što se svaki prozor izračunava pomoću vremenska oznaka polje Kornica, tako mora biti najmanje jedan element u svakom prozoru.

6.4. FileWritingBolt

Na kraju ćemo stvoriti zasun koji će uzeti sve elemente sumOfOperations veće od 2000, serializirajte ih i zapišite u datoteku:

javna klasa FileWritingBolt proširuje BaseRichBolt {javni statički logger logger = LoggerFactory.getLogger (FileWritingBolt.class); privatni pisac BufferedWriter; private String filePath; private ObjectMapper objectMapper; @Preuzmi javno čišćenje praznina () {try {writer.close (); } catch (IOException e) {logger.error ("Neuspješno zatvaranje pisca!"); }} @Override javna void priprema (karta karte, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = novi ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); probajte {Writer = new BufferedWriter (novi FileWriter (filePath)); } catch (IOException e) {logger.error ("Nije uspjelo otvaranje datoteke za pisanje.", e); }} @Override javno void izvršavanje (korijen korpica) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); duga oznaka vremena početka = tuple.getLongByField ("oznaka vremena početka"); long endTimestamp = tuple.getLongByField ("endTimestamp"); if (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = novi AggregatedWindow (sumOfOperations, beginningTimestamp, endTimestamp); probajte {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); writer.newLine (); Writer.flush (); } catch (IOException e) {logger.error ("Nije uspjelo pisanje podataka u datoteku.", e); }}} // javni konstruktor i druge metode}

Imajte na umu da ne trebamo deklarirati izlaz jer će ovo biti zadnji vijak u našoj topologiji

6.5. Pokretanje topologije

Napokon, možemo sve povezati i pokrenuti našu topologiju:

javna statička void runTopology () {Graditelj TopologyBuilder = novi TopologyBuilder (); Izljev slučajni = novi RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Vijčano filtriranje = novi FilteringBolt (); builder.setBolt ("filteringBolt", filtriranje) .shuffleGrouping ("randomNumberSpout"); Zbiranje vijaka = novo AggregatingBolt () .withTimestampField ("vremenska oznaka") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", agregiranje) .shuffleGrouping ("filteringBolt"); String filePath = "./src/main/resources/data.txt"; Datoteka vijka = novi FileWritingBolt (filePath); builder.setBolt ("fileBolt", datoteka) .shuffleGrouping ("aggregatingBolt"); Config config = novi Config (); config.setDebug (false); Klaster LocalCluster = novi LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

Da bi podaci prolazili kroz svaki komad u topologiji, moramo naznačiti kako ih povezati. shuffleGroup omogućuje nam da navedemo te podatke za filtriranjeBolt dolazit će iz randomNumberSpout.

Za svakoga Vijak, moramo dodati shuffleGroup koji definira izvor elemenata za ovaj vijak. Izvor elemenata može biti a Izljev ili drugi Vijak. A ako isti izvor postavimo za više vijaka, izvor će emitirati sve elemente za svakog od njih.

U ovom će slučaju naša topologija koristiti Lokalni klaster za pokretanje posla lokalno.

7. Zaključak

U ovom uputstvu predstavili smo Apache Storm, distribuirani računalni sustav u stvarnom vremenu. Stvorili smo izljev, nekoliko vijaka i spojili ih u cjelovitu topologiju.

Kao i uvijek, svi uzorci koda mogu se naći na GitHubu.