Uvod u Apache Flink s Javom

1. Pregled

Apache Flink je okvir za obradu velikih podataka koji programerima omogućuje obradu velike količine podataka na vrlo učinkovit i skalabilan način.

U ovom ćemo članku predstaviti neke od osnovni API koncepti i standardne transformacije podataka dostupni u Apache Flink Java API. Tečan stil ovog API-ja olakšava rad s Flinkovim središnjim konstruktom - distribuiranom kolekcijom.

Prvo ćemo pogledati Flinkove Skup podataka API transformacije i koriste ih za provedbu programa za brojanje riječi. Zatim ćemo kratko pogledati Flinkove DataStream API, koji vam omogućuje obradu tokova događaja u stvarnom vremenu.

2. Ovisnost Mavena

Za početak trebamo dodati Maven ovisnosti flink-java i flink-test-utils knjižnice:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Osnovni koncepti API-ja

Kada radimo s Flinkom, moramo znati nekoliko stvari povezanih s njegovim API-jem:

  • Svaki program Flink izvodi transformacije na distribuiranim zbirkama podataka. Osigurane su razne funkcije za transformiranje podataka, uključujući filtriranje, mapiranje, spajanje, grupiranje i agregiranje
  • A umivaonik operacija u Flinku pokreće izvršavanje toka kako bi se dobio željeni rezultat programa, kao što je spremanje rezultata u datotečni sustav ili ispis na standardni izlaz
  • Flink transformacije su lijene, što znači da se ne izvršavaju do umivaonik poziva se operacija
  • API Apache Flink podržava dva načina rada - batch i real-time. Ako imate posla s ograničenim izvorom podataka koji se može obraditi u batch načinu, upotrebljavat ćete Skup podataka API. Ako želite obrađivati ​​neograničene tokove podataka u stvarnom vremenu, morat ćete upotrijebiti DataStream API

4. Transformacije API-ja DataSet

Ulazna točka u program Flink je instanca IzvršenjeOkolina klasa - ovo definira kontekst u kojem se program izvršava.

Stvorimo IzvršenjeOkolina za početak obrade:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

Imajte na umu da će program pokrenuti na lokalnom računalu, a izvršit će obradu na lokalnom JVM-u. Ako želite započeti obradu na klasteru strojeva, morat ćete instalirati Apache Flink na te strojeve i konfigurirati IzvršenjeOkolina prema tome.

4.1. Izrada skupa podataka

Da bismo započeli s izvođenjem transformacija podataka, moramo našem programu dostaviti podatke.

Stvorimo instancu Skup podataka razred koristeći naš IzvršenjeEnvironement:

Iznosi skupa podataka = env.fromElements (1, 29, 40, 50);

Možete stvoriti Skup podataka iz više izvora, kao što je Apache Kafka, CSV, datoteka ili gotovo bilo koji drugi izvor podataka.

4.2. Filtrirajte i smanjite

Jednom kada stvorite instancu Skup podataka razreda, na njega možete primijeniti transformacije.

Recimo da želite filtrirati brojeve koji su iznad određenog praga i zatim ih sve zbrojiti. Možete koristiti filtar() i smanjiti() transformacije da bi se to postiglo:

int prag = 30; Popis collect = iznosi .filter (a -> a> prag) .reduce ((integer, t1) -> integer + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90); 

Imajte na umu da prikupiti() metoda je a umivaonik operacija koja pokreće stvarne transformacije podataka.

4.3. Karta

Recimo da imate Skup podataka od Osoba objekti:

privatna statička klasa Osoba {private int age; privatni naziv niza; // standardni konstruktori / getteri / postavljači}

Dalje, izradimo a Skup podataka ovih objekata:

Skup podataka dataDataSource = env.fromCollection (Arrays.asList (nova osoba (23, "Tom"), nova osoba (75, "Michael")));

Pretpostavimo da želite izdvojiti samo datoteku dob polje iz svakog predmeta zbirke. Možete koristiti karta() transformacija da bi se dobilo samo određeno polje Osoba razred:

Popis dobnih skupina = personDataSource .map (p -> p.age) .collect (); assertThat (dobi) .hasSize (2); assertThat (dobi) .sadrži (23, 75);

4.4. Pridružiti

Kad imate dva skupa podataka, možda ćete im se željeti pridružiti na nekim iskaznica polje. Za to možete koristiti pridružiti() preobrazba.

Stvorimo zbirke transakcija i adresa korisnika:

Adresa Tuple3 = novi Tuple3 (1, "5th Avenue", "London"); Skup podataka adrese = env.fromElements (adresa); Tuple2 firstTransaction = novi Tuple2 (1, "Transaction_1"); Skup podataka transakcije = env.fromElements (firstTransaction, novi Tuple2 (12, "Transaction_2")); 

Prvo polje u obje korijene je Cijeli broj tip, a ovo je iskaznica polje na kojem želimo spojiti oba skupa podataka.

Da bismo izveli stvarnu logiku spajanja, moramo implementirati a KeySelector sučelje za adresu i transakciju:

privatna statička klasa IdKeySelectorTransaction provodi KeySelector {@Preuzmi javni cijeli broj getKey (vrijednost Tuple2) {return value.f0; }} privatna statička klasa IdKeySelectorAddress implementira KeySelector {@Preuzmi javni cijeli broj getKey (vrijednost Tuple3) {return value.f0; }}

Svaki selektor vraća samo polje na kojem bi trebalo izvršiti spajanje.

Nažalost, ovdje nije moguće koristiti lambda izraze jer Flink trebaju generičke informacije o tipu.

Dalje, implementiramo logiku spajanja koristeći te selektore:

Popis<>> pridružio = transakcije.druži se (adrese) .gdje (novi IdKeySelectorTransaction ()) .equalTo (novi IdKeySelectorAddress ()) .collect (); assertThat (pridruženo) .hasSize (1); assertThat (pridruženo) .contains (novi Tuple2 (firstTransaction, adresa)); 

4.5. Vrsta

Recimo da imate sljedeću kolekciju Korpa2:

Tuple2 secondPerson = novi Tuple2 (4, "Tom"); Tuple2 thirdPerson = novi Tuple2 (5, "Scott"); Tuple2 4thPerson = novi Tuple2 (200, "Michael"); Tuple2 firstPerson = novi Tuple2 (1, "Jack"); Skup podataka transakcije = env.fromElements (4thPerson, secondPerson, thirdPerson, firstPerson); 

Ako ovu zbirku želite sortirati po prvom polju korice, možete upotrijebiti sortPartitions () transformacija:

Popis sortirano = transakcije .sortPartition (novi IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (sorted) .containEhactly (firstPerson, secondPerson, thirdPerson, 4thPerson);

5. Brojanje riječi

Problem s brojanjem riječi je onaj koji se obično koristi za predstavljanje mogućnosti okvira za obradu velikih podataka. Osnovno rješenje uključuje brojanje pojavljivanja riječi u unosu teksta. Upotrijebimo Flink da implementiramo rješenje ovog problema.

Kao prvi korak u našem rješenju stvaramo a LineSplitter klasa koja razdvaja naš unos u tokene (riječi), prikupljajući za svaki token a Korijen2 parova ključ / vrijednost. U svakoj od ovih korpica ključ je riječ koja se nalazi u tekstu, a vrijednost je cijela jedna (1).

Ova klasa provodi FlatMapFunction sučelje koje traje Niz kao ulaz i proizvodi a Korpa2:

javna klasa LineSplitter implementira FlatMapFunction {@Preuzmi javnu prazninu flatMap (vrijednost niza, kolektor out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0) .forEach (token -> out.collect (new Tuple2 (token) , 1))); }}

Mi zovemo prikupiti() metoda na Kolektor klasa za guranje podataka naprijed u cjevovodu obrade.

Naš sljedeći i posljednji korak je grupiranje nabora po njihovim prvim elementima (riječima), a zatim izvođenje a iznos agregat na drugim elementima kako bi se dobio broj pojavljivanja riječi:

javni statički skup podataka startWordCount (ExecutionEnvironment env, Popis redaka) baca iznimku {DataSet text = env.fromCollection (linije); vrati tekst.flatMap (novi LineSplitter ()) .groupBy (0) .aggregate (Agregacije.SUM, 1); }

Koristimo tri vrste Flink transformacija: flatMap (), groupBy (), i agregat ().

Napišimo test kako bismo ustvrdili da implementacija broja riječi funkcionira kako se očekivalo:

Redovi popisa = Arrays.asList ("Ovo je prva rečenica", "Ovo je druga rečenica s jednom riječju"); Skup podataka rezultat = WordCount.startWordCount (env, retci); Popis collect = rezultat.collect (); assertThat (collect) .containsExactlyInAnyOrder (novi Tuple2 ("a", 3), novi Tuple2 ("rečenica", 2), novi Tuple2 ("riječ", 1), novi Tuple2 ("je", 2), novi Tuple2 ( "this", 2), new Tuple2 ("second", 1), new Tuple2 ("first", 1), new Tuple2 ("with", 1), new Tuple2 ("one", 1));

6. DataStream API

6.1. Stvaranje DataStreama

Apache Flink također podržava obradu tokova događaja putem svog DataStream API-ja. Ako želimo početi trošiti događaje, prvo moramo koristiti StreamExecutionEnvironment razred:

StreamExecutionEnvironment ExecuEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

Dalje, možemo stvoriti tok događaja pomoću izvršenjeOkolina iz raznih izvora. To bi mogao biti neki sabirnik poruka Apache Kafka, ali u ovom ćemo primjeru jednostavno stvoriti izvor od nekoliko elemenata niza:

DataStream dataStream = ExecuEnvironment.fromElements ("Ovo je prva rečenica", "Ovo je druga rečenica s jednom riječju");

Možemo primijeniti transformacije na svaki element DataStream kao u normalu Skup podataka razred:

SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);

Da bismo pokrenuli izvršenje, moramo prizvati operaciju sudopera kao što je ispis () koji će samo ispisati rezultat transformacije na standardni izlaz, slijedeći s izvršiti() metoda na StreamExecutionEnvironment razred:

upperCase.print (); env.execute ();

Proizvest će sljedeći rezultat:

1> OVO JE PRVA REČENICA 2> OVO JE DRUGA REČENICA S JEDNOM RIJEČJU

6.2. Prozor događaja

Kada obrađujete tok događaja u stvarnom vremenu, ponekad ćete možda trebati grupirati događaje i primijeniti neke izračune na prozoru tih događaja.

Pretpostavimo da imamo tok događaja, gdje je svaki događaj par koji se sastoji od broja događaja i vremenske oznake kada je događaj poslan u naš sustav, te da možemo tolerirati događaje koji nisu u redu, ali samo ako nisu kasni više od dvadeset sekundi.

Za ovaj primjer, hajde da prvo stvorimo tok koji simulira dva događaja u razmaku od nekoliko minuta i definiramo ekstraktor vremenske oznake koji određuje naš prag zakašnjenja:

SingleOutputStreamOperator windowed = env.fromElements (novi Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), novi Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())). assignTimestampsAndWatermarks (novi BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

Dalje, definirajmo operaciju prozora za grupiranje naših događaja u prozore od pet sekundi i primijenimo transformaciju na te događaje:

SingleOutputStreamOperator smanjena = windowed .windowAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); smanjen.print ();

Dobit će zadnji element svakog prozora od pet sekundi, pa se ispisuje:

1> (15,1491221519)

Imajte na umu da drugi događaj ne vidimo jer je stigao kasnije od navedenog praga zakašnjenja.

7. Zaključak

U ovom smo članku predstavili okvir Apache Flink i pogledali neke od transformacija isporučenih s njegovim API-jem.

Implementirali smo program za brojanje riječi koristeći Flinkov tečni i funkcionalni DataSet API. Zatim smo pogledali DataStream API i implementirali jednostavnu transformaciju u stvarnom vremenu na toku događaja.

Provedbu svih ovih primjera i isječaka koda možete pronaći na GitHubu - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.


$config[zx-auto] not found$config[zx-overlay] not found