Vodič za Apache Crunch

1. Uvod

U ovom uputstvu demonstrirat ćemo Apache Crunch s primjerom aplikacije za obradu podataka. Ovu ćemo aplikaciju pokrenuti pomoću okvira MapReduce.

Za početak ćemo ukratko pokriti neke koncepte Apache Cruncha. Tada ćemo uskočiti u uzorak aplikacije. U ovoj ćemo aplikaciji izvršiti obradu teksta:

  • Prije svega, čitat ćemo retke iz tekstualne datoteke
  • Kasnije ćemo ih podijeliti u riječi i ukloniti neke uobičajene riječi
  • Zatim ćemo grupirati preostale riječi kako bismo dobili popis jedinstvenih riječi i njihov broj
  • Napokon, popis ćemo zapisati u tekstualnu datoteku

2. Što je drobljenje?

MapReduce je distribuirani paralelni programski okvir za obradu velike količine podataka na klasteru poslužitelja. Softverski okviri kao što su Hadoop i Spark implementiraju MapReduce.

Crunch pruža okvir za pisanje, testiranje i pokretanje MapReduce cjevovoda u Javi. Ovdje ne pišemo izravno MapReduce poslove. Umjesto toga, definiramo cjevovod podataka (tj. Operacije za izvođenje koraka unosa, obrade i izlaza) pomoću API-ja Crunch. Crunch Planner ih preslikava na zadatke MapReduce i izvršava ih po potrebi.

Stoga je svaki Crunch podatkovni cjevovod koordiniran primjerom Cjevovod sučelje. Ovo sučelje također definira metode za čitanje podataka u cjevovod putem Izvor instanci i zapisivanje podataka iz cjevovoda u Cilj instance.

Imamo 3 sučelja za predstavljanje podataka:

  1. PZbirka - nepromjenjiva, distribuirana zbirka elemenata
  2. PTable<>, V> - nepromjenjiva, distribuirana, neuređena više-mapa ključeva i vrijednosti
  3. PGroupedTable<>, V> - distribuirana, razvrstana karta ključeva tipa K na an Iterativ V koji se može ponoviti točno jednom

DoFn je osnovna klasa za sve funkcije obrade podataka. Odgovara Mapper, Reduktor i Kombinator satovi u MapReduceu. Većinu razvojnog vremena trošimo na pisanje i testiranje logičkih proračuna pomoću njega.

Sad kad smo više upoznati s Crunchom, iskoristimo ga za izgradnju primjera aplikacije.

3. Postavljanje Crunch projekta

Prije svega, postavimo Crunch Project s Mavenom. To možemo učiniti na dva načina:

  1. Dodajte potrebne ovisnosti u pom.xml datoteka postojećeg projekta
  2. Koristite arhetip za generiranje početnog projekta

Kratko ćemo pogledati oba pristupa.

3.1. Ovisnosti Mavena

Da bismo dodali Crunch postojećem projektu, dodajmo potrebne ovisnosti u pom.xml datoteka.

Prvo, dodajmo crunch-core knjižnica:

 org.apache.crunch crunch-core 0.15.0 

Dalje, dodajmo hadoop-klijent knjižnica za komunikaciju s Hadoopom. Koristimo verziju koja odgovara Hadoop instalaciji:

 osiguran org.apache.hadoop hadoop-client 2.2.0 

Možemo provjeriti Maven Central za najnovije verzije biblioteka crunch-core i hadoop-client.

3.2. Maven Arhetip

Drugi je pristup brzo generiranje početnog projekta koristeći arhetip Maven koji je pružio Crunch:

mvn arhetip: generiraj -Dfilter = org.apache.crunch: crunch-archetype 

Kada se od vas zatraži gornja naredba, pružamo verziju Crunch i detalje o artefaktu projekta.

4. Postavljanje cjevovoda za drobljenje

Nakon postavljanja projekta, moramo stvoriti Cjevovod objekt. Crunch ima 3 Cjevovod implementacije:

  • MRPipeline - izvršava se u okviru Hadoop MapReduce
  • SparkPipeline - izvodi se kao niz Spark cjevovoda
  • MemPipeline - izvršava u memoriji na klijentu i korisno je za jedinstveno testiranje

Obično razvijamo i testiramo pomoću primjera MemPipeline. Kasnije koristimo primjerak MRPipeline ili SparkPipeline za stvarno izvršenje.

Ako nam je trebao cjevovod u memoriji, mogli bismo koristiti statičku metodu getInstance da biste dobili MemPipeline primjer:

Cjevovod cjevovoda = MemPipeline.getInstance ();

Ali za sada, stvorimo primjerak MRPipeline za izvršavanje aplikacije s Hadoop-om:

Cjevovod cjevovoda = novi MRPipeline (WordCount.class, getConf ());

5. Pročitajte ulazne podatke

Nakon stvaranja objekta cjevovoda, želimo pročitati ulazne podatke. The Cjevovod sučelje pruža praktičnu metodu za čitanje unosa iz tekstualne datoteke, readTextFile (pathName).

Nazovimo ovu metodu za čitanje ulazne tekstualne datoteke:

Linije PCollection = pipeline.readTextFile (inputPath);

Gornji kod čita tekstualnu datoteku kao zbirku Niz.

Kao sljedeći korak, napišite test za čitanje unosa:

@Test javna praznina givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Cjevovod cjevovoda = MemPipeline.getInstance (); Linije PCollection = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, lines.asCollection () .getValue () .size ()); }

U ovom testu provjeravamo dobivamo li očekivani broj redaka prilikom čitanja tekstualne datoteke.

6. Koraci obrade podataka

Nakon čitanja ulaznih podataka, trebamo ih obraditi. Crunch API sadrži brojne podrazrede DoFn za rukovanje uobičajenim scenarijima obrade podataka:

  • FilterFn - filtrira članove zbirke na temelju logičkog stanja
  • MapFn - preslikava svaki ulazni zapis u točno jedan izlazni zapis
  • CombineFn - kombinira brojne vrijednosti u jednu vrijednost
  • JoinFn - izvodi spojeve kao što su unutarnji spoj, lijevi vanjski spoj, desni vanjski spoj i puni vanjski spoj

Primijenimo sljedeću logiku obrade podataka pomoću ovih klasa:

  1. Podijelite svaki redak u ulaznoj datoteci u riječi
  2. Uklonite zaustavne riječi
  3. Prebroj jedinstvene riječi

6.1. Podijelite redak teksta u riječi

Prije svega, kreirajmo Tokenizer razreda podijeliti crtu na riječi.

Produljit ćemo DoFn razred. Ova klasa ima apstraktnu metodu koja se naziva postupak. Ova metoda obrađuje ulazne zapise iz a PZbirka i šalje izlaz na Odašiljač.

U ovu metodu moramo implementirati logiku razdvajanja:

javna klasa Tokenizer proširuje DoFn {privatni statički konačni Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @Preuzmi javni void postupak (linija niza, emiter emitera) {for (Riječ niza: SPLITTER.split (linija)) {emitter.emit (riječ); }}} 

U gornjoj smo implementaciji koristili Cjepidlaka klase iz biblioteke Guava za izdvajanje riječi iz retka.

Dalje, napišimo jedinstveni test za Tokenizer razred:

@RunWith (MockitoJUnitRunner.class) javna klasa TokenizerUnitTest {@Mock privatni emiter emitera; @Test javna praznina givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmit () {Tokenizer spliter = novi Tokenizer (); splitter.process ("hello world", emiter); verify (emiter) .emit ("zdravo"); verify (emiter) .emit ("svijet"); verifyNoMoreInteractions (emiter); }}

Gornji test provjerava jesu li vraćene ispravne riječi.

Na kraju, podijelimo retke pročitane iz ulazne tekstualne datoteke pomoću ove klase.

The paralelnoDo metoda PZbirka sučelje primjenjuje dano DoFn svim elementima i vraća novi PZbirka.

Pozovimo ovu metodu na zbirku linija i proslijedimo instancu Tokenizer:

Riječi PCollection = lines.parallelDo (novi Tokenizer (), Writables.strings ()); 

Kao rezultat, dobit ćemo popis riječi u ulaznoj tekstualnoj datoteci. U sljedećem ćemo koraku ukloniti zaustavne riječi.

6.2. Uklonite zaustavne riječi

Slično prethodnom koraku, izradimo a StopWordFilter razred za filtriranje zaustavnih riječi.

Međutim, produžit ćemo FilterFn umjesto DoFn. FilterFn ima apstraktnu metodu koja se naziva prihvatiti. Moramo implementirati logiku filtriranja u ovu metodu:

javna klasa StopWordFilter proširuje FilterFn {// engleske zaustavne riječi posuđene od Lucene. privatni statički konačni skup STOP_WORDS = ImmutableSet .copyOf (novi String [] {"a", "i", "are", "as", "at", "be", "but", "by", "for" , "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", " t "," to "," the "," their "," then "," there "," these "," they "," this "," to "," was "," will "," with " }); @Override public boolean accept (String word) {return! STOP_WORDS.contens (word); }}

Dalje, napišimo jedinični test za StopWordFilter razred:

javna klasa StopWordFilterUnitTest {@Test javna praznina givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("the")); assertFalse (filter.accept ("a")); } @Test javna praznina givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = new StopWordFilter (); assertTrue (filter.accept ("Hello")); assertTrue (filter.accept ("Svijet")); } @Test javna praznina givenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("Ovo", "je", "a", "test", "rečenica"); PCollection noStopWords = words.filter (novi StopWordFilter ()); assertEquals (ImmutableList.of ("Ovo", "test", "rečenica"), Lists.newArrayList (noStopWords.materialize ())); }}

Ovaj test provjerava je li logika filtriranja ispravno izvedena.

Napokon, poslužimo se StopWordFilter za filtriranje popisa riječi generiranih u prethodnom koraku. The filtar metoda PZbirka sučelje primjenjuje dano FilterFn svim elementima i vraća novi PZbirka.

Nazovimo ovu metodu na zbirci riječi i proslijedimo instancu StopWordFilter:

PCollection noStopWords = words.filter (novi StopWordFilter ());

Kao rezultat, dobivamo filtriranu zbirku riječi.

6.3. Broji jedinstvene riječi

Nakon dobivanja filtrirane zbirke riječi, želimo izračunati koliko se često svaka riječ pojavljuje. PZbirka sučelje ima niz metoda za izvođenje uobičajenih agregacija:

  • min - vraća minimalni element zbirke
  • maks - vraća maksimalni element zbirke
  • duljina - vraća broj elemenata u zbirci
  • računati - vraća a PTable koji sadrži broj svakog jedinstvenog elementa zbirke

Iskoristimo računati metoda za dobivanje jedinstvenih riječi zajedno s njihovim brojanjem:

// Metoda brojanja primjenjuje niz primitiva Crunch i vraća // kartu jedinstvenih riječi u ulaznom PCollectionu na njihovo brojanje. PTable counts = noStopWords.count ();

7. Navedite izlaz

Kao rezultat prethodnih koraka imamo tablicu riječi i njihov broj. Ovaj rezultat želimo zapisati u tekstualnu datoteku. The Cjevovod sučelje pruža praktične metode za pisanje rezultata:

void write (zbirka PCollection, Target target); void write (zbirka PCollection, Target target, Target.WriteMode writeMode); void writeTextFile (PCollection collection, String pathName);

Stoga, nazovimo writeTextFile metoda:

pipeline.writeTextFile (counts, outputPath); 

8. Upravljanje izvršenjem cjevovoda

Svi dosadašnji koraci upravo su definirali cjevovod podataka. Nijedan unos nije pročitan ili obrađen. Ovo je zbog Crunch koristi lijeni model izvršenja.

Ne pokreće poslove MapReduce dok se na sučelju cjevovoda ne pozove metoda koja kontrolira planiranje i izvršavanje poslova:

  • trčanje - priprema plan izvršenja za stvaranje potrebnih rezultata, a zatim ga izvršava sinkrono
  • gotovo - izvodi sve preostale zadatke potrebne za generiranje rezultata, a zatim čisti sve međusobno stvorene datoteke podataka
  • runAsync - slično metodi izvođenja, ali se izvršava na neblokirajući način

Stoga, nazovimo gotovo metoda za izvršavanje cjevovoda kao zadataka MapReduce:

Rezultat PipelineResult = pipeline.done (); 

Gornja izjava pokreće zadatke MapReduce za čitanje unosa, obradu i zapisivanje rezultata u izlazni direktorij.

9. Sastavljanje cjevovoda

Do sada smo razvili i jedinstveno testirali logiku za čitanje ulaznih podataka, obradu i upisivanje u izlaznu datoteku.

Zatim ih spojimo za izgradnju cjelokupnog cjevovoda podataka:

public int run (String [] args) baca iznimku {String inputPath = args [0]; Niz outputPath = args [1]; // Stvaranje objekta za koordinaciju stvaranja i izvođenja cjevovoda. Cjevovod cjevovoda = novi MRPipeline (WordCount.class, getConf ()); // Referenca na zadanu tekstualnu datoteku kao zbirku žica. Linije PCollection = pipeline.readTextFile (inputPath); // Definirajte funkciju koja razdvaja svaki redak u PCollection of Strings u // PCollection koji se sastoji od pojedinačnih riječi u datoteci. // Drugi argument postavlja format serializacije. Riječi PCollection = lines.parallelDo (novi Tokenizer (), Writables.strings ()); // Uzmi zbirku riječi i ukloni poznate zaustavne riječi. PCollection noStopWords = words.filter (novi StopWordFilter ()); // Metoda brojanja primjenjuje niz primitiva Crunch i vraća // kartu jedinstvenih riječi u ulaznom PCollectionu na njihovo brojanje. PTable counts = noStopWords.count (); // Naložite cjevovodu da zapisuje rezultirajuće brojanje u tekstualnu datoteku. pipeline.writeTextFile (counts, outputPath); // Izvršite cjevovod kao MapReduce. Rezultat PipelineResult = pipeline.done (); vratiti rezultat.uspjelo ()? 0: 1; }

10. Konfiguracija pokretanja Hadoop-a

Stoga je cjevovod podataka spreman.

Međutim, potreban nam je kôd da bismo ga pokrenuli. Stoga, napišimo glavni metoda za pokretanje aplikacije:

javna klasa WordCount proširuje Konfigurirani implementira Alat {public static void main (String [] args) baca izuzetak {ToolRunner.run (new Configuration (), new WordCount (), args); }

ToolRunner.run raščlanjuje Hadoop konfiguraciju iz naredbenog retka i izvršava zadatak MapReduce.

11. Pokrenite aplikaciju

Kompletna prijava je sada spremna. Pokrenimo sljedeću naredbu za njezinu izgradnju:

mvn paket 

Kao rezultat gornje naredbe, u ciljani direktorij dobivamo zapakiranu aplikaciju i posebnu jar.

Upotrijebimo ovu jar za posao za izvršavanje aplikacije na Hadoopu:

hadoop jar target / crunch-1.0-SNAPSHOT-job.jar 

Aplikacija čita ulaznu datoteku i zapisuje rezultat u izlaznu datoteku. Izlazna datoteka sadrži jedinstvene riječi zajedno s brojevima sličnim sljedećim:

[Dodaj, 1] [Dodano, 1] [Divljenje, 1] [Priznavanje, 1] [Doplatak, 1]

Uz Hadoop, aplikaciju možemo pokretati unutar IDE-a, kao samostalnu aplikaciju ili kao jedinične testove.

12. Zaključak

U ovom smo uputstvu izradili aplikaciju za obradu podataka koja se izvodi na MapReduceu. Apache Crunch olakšava pisanje, testiranje i izvršavanje MapReduce cjevovoda u Javi.

Kao i obično, puni izvorni kod možete pronaći na Githubu.