Izgradnja podatkovnog cjevovoda s Kafkom, Spark Streamingom i Cassandrom

1. Pregled

Apache Kafka skalabilna je platforma visokih performansi s malim kašnjenjem omogućuje čitanje i pisanje tokova podataka poput sustava za razmjenu poruka. Možemo prilično lako započeti s Kafkom na Javi.

Spark Streaming dio je Apache Spark platforme koja omogućuje skalabilnu, visoku propusnost, obradu tokova podataka otpornu na kvarove. Iako je napisan na Scali, Spark nudi Java API-je za rad.

Apache Cassandra je distribuirana i širokokolona NoSQL pohrana podataka. Više detalja o Cassandri dostupno je u našem prethodnom članku.

U ovom uputstvu kombinirat ćemo ih kako bismo stvorili visoko skalabilan i otporan na kvarove podataka za tok podataka u stvarnom vremenu.

2. Instalacije

Za početak trebat će nam lokalno instalirani Kafka, Spark i Cassandra na našem stroju za pokretanje aplikacije. Vidjet ćemo kako razvijati cjevovod podataka pomoću ovih platformi kako budemo išli dalje.

Međutim, ostavit ćemo sve zadane konfiguracije, uključujući priključke za sve instalacije, što će vam pomoći da vodič nesmetano radi.

2.1. Kafka

Instaliranje Kafke na naš lokalni stroj prilično je jednostavno i može se naći kao dio službene dokumentacije. Koristit ćemo 2.1.0 izdanje Kafke.

U Dodatku, Kafka zahtijeva da čuvar zoološkog vrta Apache trči ali u svrhu ovog vodiča iskoristit ćemo instancu Zookeeper jednog čvora pakiranu s Kafkom.

Nakon što uspijemo pokrenuti Zookeeper i Kafka lokalno slijedeći službeni vodič, možemo nastaviti s izradom naše teme, nazvane „poruke“:

 $ KAFKA_HOME $ \ bin \ windows \ kafka-topics.bat --create \ --zookeeper localhost: 2181 \ - faktor replikacije 1 - particije 1 \ - teme poruke

Imajte na umu da je gornja skripta za Windows platformu, ali postoje slične skripte dostupne i za platforme slične Unixu.

2.2. Iskra

Spark koristi Hadoopove knjižnice klijenata za HDFS i YARN. Slijedom toga, može biti vrlo nezgodno sastaviti kompatibilne verzije svih ovih. Međutim, službeno preuzimanje Sparka dolazi u pretpakiranju s popularnim verzijama Hadoopa. Za ovaj ćemo vodič koristiti inačicu 2.3.0 „unaprijed izrađenu za Apache Hadoop 2.7 i novije verzije“.

Nakon što se otpakuje odgovarajući paket Sparka, dostupne skripte mogu se koristiti za podnošenje prijava. To ćemo vidjeti kasnije kada razvijemo našu aplikaciju u Spring Boot-u.

2.3. Cassandra

DataStax čini izdanje Cassandre u zajednici dostupnim za različite platforme, uključujući Windows. Ovo možemo vrlo lako preuzeti i instalirati na naš lokalni stroj slijedeći službenu dokumentaciju. Koristit ćemo verziju 3.9.0.

Nakon što uspijemo instalirati i pokrenuti Cassandru na našem lokalnom računalu, možemo nastaviti s izradom prostora ključeva i tablice. To se može učiniti pomoću CQL školjke koja se isporučuje s našom instalacijom:

STVARI rječnik KEYSPACE S REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; UPOTREBITI rječnik; IZRADI TABELU riječi (tekst riječi PRIMARNI KLJUČ, računaj int);

Imajte na umu da smo stvorili prostor imena pod nazivom vokabular i tamo nazvani stol riječi s dva stupca, riječ, i računati.

3. Ovisnosti

Ovisnosti Kafke i Spark možemo integrirati u našu aplikaciju putem Mavena. Povući ćemo ove ovisnosti iz Maven Central:

  • Jezgra Iskra
  • SQL iskra
  • Streaming Spark
  • Streaming Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

U skladu s tim možemo ih dodati u svoj pom:

 org.apache.spark spark-core_2.11 2.3.0 pod uvjetom org.apache.spark spark-sql_2.11 2.3.0 pod uvjetom da org.apache.spark spark-streaming_2.11 2.3.0 pod uvjetom da org.apache.spark spark-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark spark-cassandra-connector_2.11 2.3.0 com.datastax.spark spark-cassandra-priključak-java_2.11 1.5.2 

Imajte na umu da su neke od ovih ovisnosti označene kao pod uvjetom u opsegu. To je zato što će ih staviti na raspolaganje instalacija Spark gdje ćemo podnijeti zahtjev za izvršenje pomoću iskrenja.

4. Iskreno strujanje - Kafka integracijske strategije

U ovom trenutku vrijedi kratko razgovarati o integracijskim strategijama za Spark i Kafku.

Kafka je predstavio novi potrošački API između verzija 0.8 i 0.10. Stoga su odgovarajući paketi Spark Streaming dostupni za obje verzije brokera. Važno je odabrati pravi paket, ovisno o dostupnom brokeru i željenim značajkama.

4.1. Spark Streaming Kafka 0.8

Verzija 0.8 je API stabilne integracije s opcijama upotrebe prijemnika ili izravnog pristupa. Nećemo ulaziti u detalje ovih pristupa koje možemo pronaći u službenoj dokumentaciji. Ovdje je važno napomenuti da je ovaj paket kompatibilan s programom Kafka Broker verzije 0.8.2.1 ili novijom.

4.2. Spark Streaming Kafka 0.10

Ovo je trenutno u eksperimentalnom stanju i kompatibilno je samo s inačicom Kafka Broker 0.10.0 ili novijom. Ovaj paket nudi samo izravni pristup, sada koristeći novi potrošački API Kafka. Više detalja o tome možemo pronaći u službenoj dokumentaciji. Važno je da je nije kompatibilno sa starijim verzijama Kafka Broker.

Napominjemo da ćemo za ovaj vodič koristiti paket 0.10. Ovisnost spomenuta u prethodnom odjeljku odnosi se samo na ovo.

5. Razvoj podatkovnog cjevovoda

Stvorit ćemo jednostavnu aplikaciju na Javi koristeći Spark koja će se integrirati s temom Kafka koju smo ranije kreirali. Aplikacija će čitati poruke kao objavljene i brojati učestalost riječi u svakoj poruci. Zatim će se ažurirati u tablici Cassandra koju smo ranije stvorili.

Zamislimo brzo kako će podaci teći:

5.1. Dobivanje JavaStreamingContext

Prvo ćemo započeti inicijalizacijom JavaStreamingContext što je ulazna točka za sve programe Spark Streaming:

SparkConf sparkConf = novi SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = novi JavaStreamingContext (sparkConf, Durations.seconds (1));

5.2. Dobivanje DStream iz Kafke

Sada se možemo povezati s Kafkinom temom iz JavaStreamingContext:

Karta kafkaParams = novi HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("value.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "najnovije"); kafkaParams.put ("enable.auto.commit", false); Teme zbirke = Arrays.asList ("poruke"); JavaInputDStream messages = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Pretplatite se (teme, kafkaParams));

Napominjemo da ovdje moramo pružiti sredstva za uklanjanje ključa i vrijednosti. Za uobičajene vrste podataka poput Niz, deserijalizator je dostupan prema zadanim postavkama. Međutim, ako želimo dohvatiti prilagođene vrste podataka, morat ćemo pružiti prilagođene deserializatore.

Evo, dobili smo JavaInputDStream što je provedba Diskretiziranih potoka ili DStreams, osnovna apstrakcija koju pruža Spark Streaming. Interno DStreams nije ništa drugo nego kontinuirani niz RDD-ova.

5.3. Obrada dobivena DStream

Sada ćemo izvršiti niz operacija na JavaInputDStream za dobivanje frekvencija riječi u porukama:

Rezultati JavaPairDStream = messages .mapToPair (record -> new Tuple2 (record.key (), record.value ())); JavaDStream linije = rezultati .map (tuple2 -> tuple2._2 ()); Riječi JavaDStream = linije .flatMap (x -> Arrays.asList (x.split ("\ s +"). Iterator ()); JavaPairDStream wordCounts = riječi .mapToPair (s -> novi Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);

5.4. Uporno obrađeno DStream u Cassandru

Napokon, možemo se iterirati preko obrađenog JavaPairDStream da ih umetnete u naš stol Cassandra:

wordCounts.foreachRDD (javaRdd -> {Mapa wordCountMap = javaRdd.collectAsMap (); for (Ključ niza: wordCountMap.keySet ()) {Popis wordList = Arrays.asList (nova Word (ključ, wordCountMap.get (ključ))); JavaRDD rdd = streamingContext.sparkContext (). Paralelizirati (wordList); javaFunctions (rdd) .writerBuilder ("vokabular", "riječi", mapToRow (Word.class)). SaveToCassandra ();}});

5.5. Pokretanje aplikacije

Budući da je ovo aplikacija za obradu streama, željeli bismo nastaviti:

streamingContext.start (); streamingContext.awaitTermination ();

6. Korištenje kontrolnih točaka

U aplikaciji za obradu streama, često je korisno zadržati stanje između serija podataka koji se obrađuju.

Na primjer, u našem prethodnom pokušaju, možemo pohraniti samo trenutnu učestalost riječi. Što ako umjesto toga želimo pohraniti kumulativnu frekvenciju? Spark Streaming omogućuje koncept nazvan kontrolne točke.

Sada ćemo izmijeniti cjevovod koji smo stvorili ranije kako bismo iskoristili kontrolne točke:

Napominjemo da ćemo kontrolne točke upotrebljavati samo za sesiju obrade podataka. To ne osigurava toleranciju kvarova. Međutim, kontrolna točka se može koristiti i za toleranciju kvarova.

Moramo izvršiti nekoliko promjena u našoj aplikaciji kako bismo iskoristili kontrolne točke. To uključuje pružanje JavaStreamingContext s mjestom kontrolne točke:

streamingContext.checkpoint ("./. kontrolna točka");

Ovdje koristimo lokalni datotečni sustav za pohranu kontrolnih točaka. Međutim, radi robusnosti, ovo treba pohraniti na mjesto poput HDFS-a, S3 ili Kafke. Više o tome dostupno je u službenoj dokumentaciji.

Dalje, morat ćemo dohvatiti kontrolnu točku i stvoriti kumulativni broj riječi tijekom obrade svake particije pomoću funkcije mapiranja:

JavaMapWithStateDStream kumulativniWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); Izlaz Tuple2 = novo Tuple2 (riječ, zbroj); state.update (zbroj); povratni izlaz;}));

Jednom kada dobijemo kumulativni broj riječi, možemo nastaviti s ponavljanjem i spremiti ih u Cassandru kao i prije.

Imajte na umu da dok provjera podataka korisna je za obradu sa stanjem, a dolazi s troškovima kašnjenja. Stoga je to potrebno pametno koristiti zajedno s optimalnim intervalom provjere.

7. Razumijevanje offseta

Ako se prisjetimo nekih Kafkinih parametara koje smo postavili ranije:

kafkaParams.put ("auto.offset.reset", "najnovije"); kafkaParams.put ("enable.auto.commit", false);

To u osnovi to i znače ne želimo se automatski obvezati za pomak i željeli bismo odabrati najnoviji pomak svaki put kada se pokrene potrošačka grupa. Zbog toga će naša aplikacija moći trošiti poruke objavljene tijekom razdoblja u kojem je pokrenuta.

Ako želimo potrošiti sve objavljene poruke neovisno o tome radi li se aplikacija ili ne, a također želimo pratiti već objavljene poruke, morat ćemo konfigurirati pomak na odgovarajući način, zajedno sa spremanjem stanja pomaka, iako je ovo malo izvan opsega za ovaj vodič.

Ovo je također način na koji Spark Streaming nudi određenu razinu jamstva poput "točno jednom". To u osnovi znači da će Spark Streaming svaku poruku objavljenu na Kafkinoj temi obraditi samo jednom točno.

8. Postavljanje aplikacije

Možemo implementirajte našu aplikaciju pomoću skripte Spark-submit koji dolazi predpakiran s instalacijom Spark:

$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies .jar

Imajte na umu da staklenka koju kreiramo pomoću Mavena treba sadržavati ovisnosti koje nisu označene kao pod uvjetom u opsegu.

Jednom kad pošaljemo ovu prijavu i objavimo neke poruke u temi Kafka koju smo ranije kreirali, trebali bismo vidjeti kumulativni broj riječi objavljen u tablici Cassandra koju smo ranije kreirali.

9. Zaključak

Da rezimiramo, u ovom uputstvu naučili smo kako stvoriti jednostavan cjevovod podataka koristeći Kafku, Spark Streaming i Cassandru. Također smo naučili kako iskoristiti kontrolne točke u Spark Streamingu kako bi se održalo stanje između serija.

Kao i uvijek, kod za primjere dostupan je na GitHub-u.