Uvod u Apache Flink s Javom
1. Pregled
Apache Flink je okvir za obradu velikih podataka koji programerima omogućuje obradu velike količine podataka na vrlo učinkovit i skalabilan način.
U ovom ćemo članku predstaviti neke od osnovni API koncepti i standardne transformacije podataka dostupni u Apache Flink Java API. Tečan stil ovog API-ja olakšava rad s Flinkovim središnjim konstruktom - distribuiranom kolekcijom.
Prvo ćemo pogledati Flinkove Skup podataka API transformacije i koriste ih za provedbu programa za brojanje riječi. Zatim ćemo kratko pogledati Flinkove DataStream API, koji vam omogućuje obradu tokova događaja u stvarnom vremenu.
2. Ovisnost Mavena
Za početak trebamo dodati Maven ovisnosti flink-java i flink-test-utils knjižnice:
org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test
3. Osnovni koncepti API-ja
Kada radimo s Flinkom, moramo znati nekoliko stvari povezanih s njegovim API-jem:
- Svaki program Flink izvodi transformacije na distribuiranim zbirkama podataka. Osigurane su razne funkcije za transformiranje podataka, uključujući filtriranje, mapiranje, spajanje, grupiranje i agregiranje
- A umivaonik operacija u Flinku pokreće izvršavanje toka kako bi se dobio željeni rezultat programa, kao što je spremanje rezultata u datotečni sustav ili ispis na standardni izlaz
- Flink transformacije su lijene, što znači da se ne izvršavaju do umivaonik poziva se operacija
- API Apache Flink podržava dva načina rada - batch i real-time. Ako imate posla s ograničenim izvorom podataka koji se može obraditi u batch načinu, upotrebljavat ćete Skup podataka API. Ako želite obrađivati neograničene tokove podataka u stvarnom vremenu, morat ćete upotrijebiti DataStream API
4. Transformacije API-ja DataSet
Ulazna točka u program Flink je instanca IzvršenjeOkolina klasa - ovo definira kontekst u kojem se program izvršava.
Stvorimo IzvršenjeOkolina za početak obrade:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();
Imajte na umu da će program pokrenuti na lokalnom računalu, a izvršit će obradu na lokalnom JVM-u. Ako želite započeti obradu na klasteru strojeva, morat ćete instalirati Apache Flink na te strojeve i konfigurirati IzvršenjeOkolina prema tome.
4.1. Izrada skupa podataka
Da bismo započeli s izvođenjem transformacija podataka, moramo našem programu dostaviti podatke.
Stvorimo instancu Skup podataka razred koristeći naš IzvršenjeEnvironement:
Iznosi skupa podataka = env.fromElements (1, 29, 40, 50);
Možete stvoriti Skup podataka iz više izvora, kao što je Apache Kafka, CSV, datoteka ili gotovo bilo koji drugi izvor podataka.
4.2. Filtrirajte i smanjite
Jednom kada stvorite instancu Skup podataka razreda, na njega možete primijeniti transformacije.
Recimo da želite filtrirati brojeve koji su iznad određenog praga i zatim ih sve zbrojiti. Možete koristiti filtar() i smanjiti() transformacije da bi se to postiglo:
int prag = 30; Popis collect = iznosi .filter (a -> a> prag) .reduce ((integer, t1) -> integer + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90);
Imajte na umu da prikupiti() metoda je a umivaonik operacija koja pokreće stvarne transformacije podataka.
4.3. Karta
Recimo da imate Skup podataka od Osoba objekti:
privatna statička klasa Osoba {private int age; privatni naziv niza; // standardni konstruktori / getteri / postavljači}
Dalje, izradimo a Skup podataka ovih objekata:
Skup podataka dataDataSource = env.fromCollection (Arrays.asList (nova osoba (23, "Tom"), nova osoba (75, "Michael")));
Pretpostavimo da želite izdvojiti samo datoteku dob polje iz svakog predmeta zbirke. Možete koristiti karta() transformacija da bi se dobilo samo određeno polje Osoba razred:
Popis dobnih skupina = personDataSource .map (p -> p.age) .collect (); assertThat (dobi) .hasSize (2); assertThat (dobi) .sadrži (23, 75);
4.4. Pridružiti
Kad imate dva skupa podataka, možda ćete im se željeti pridružiti na nekim iskaznica polje. Za to možete koristiti pridružiti() preobrazba.
Stvorimo zbirke transakcija i adresa korisnika:
Adresa Tuple3 = novi Tuple3 (1, "5th Avenue", "London"); Skup podataka adrese = env.fromElements (adresa); Tuple2 firstTransaction = novi Tuple2 (1, "Transaction_1"); Skup podataka transakcije = env.fromElements (firstTransaction, novi Tuple2 (12, "Transaction_2"));
Prvo polje u obje korijene je Cijeli broj tip, a ovo je iskaznica polje na kojem želimo spojiti oba skupa podataka.
Da bismo izveli stvarnu logiku spajanja, moramo implementirati a KeySelector sučelje za adresu i transakciju:
privatna statička klasa IdKeySelectorTransaction provodi KeySelector {@Preuzmi javni cijeli broj getKey (vrijednost Tuple2) {return value.f0; }} privatna statička klasa IdKeySelectorAddress implementira KeySelector {@Preuzmi javni cijeli broj getKey (vrijednost Tuple3) {return value.f0; }}
Svaki selektor vraća samo polje na kojem bi trebalo izvršiti spajanje.
Nažalost, ovdje nije moguće koristiti lambda izraze jer Flink trebaju generičke informacije o tipu.
Dalje, implementiramo logiku spajanja koristeći te selektore:
Popis<>> pridružio = transakcije.druži se (adrese) .gdje (novi IdKeySelectorTransaction ()) .equalTo (novi IdKeySelectorAddress ()) .collect (); assertThat (pridruženo) .hasSize (1); assertThat (pridruženo) .contains (novi Tuple2 (firstTransaction, adresa));
4.5. Vrsta
Recimo da imate sljedeću kolekciju Korpa2:
Tuple2 secondPerson = novi Tuple2 (4, "Tom"); Tuple2 thirdPerson = novi Tuple2 (5, "Scott"); Tuple2 4thPerson = novi Tuple2 (200, "Michael"); Tuple2 firstPerson = novi Tuple2 (1, "Jack"); Skup podataka transakcije = env.fromElements (4thPerson, secondPerson, thirdPerson, firstPerson);
Ako ovu zbirku želite sortirati po prvom polju korice, možete upotrijebiti sortPartitions () transformacija:
Popis sortirano = transakcije .sortPartition (novi IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (sorted) .containEhactly (firstPerson, secondPerson, thirdPerson, 4thPerson);
5. Brojanje riječi
Problem s brojanjem riječi je onaj koji se obično koristi za predstavljanje mogućnosti okvira za obradu velikih podataka. Osnovno rješenje uključuje brojanje pojavljivanja riječi u unosu teksta. Upotrijebimo Flink da implementiramo rješenje ovog problema.
Kao prvi korak u našem rješenju stvaramo a LineSplitter klasa koja razdvaja naš unos u tokene (riječi), prikupljajući za svaki token a Korijen2 parova ključ / vrijednost. U svakoj od ovih korpica ključ je riječ koja se nalazi u tekstu, a vrijednost je cijela jedna (1).
Ova klasa provodi FlatMapFunction sučelje koje traje Niz kao ulaz i proizvodi a Korpa2:
javna klasa LineSplitter implementira FlatMapFunction {@Preuzmi javnu prazninu flatMap (vrijednost niza, kolektor out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0) .forEach (token -> out.collect (new Tuple2 (token) , 1))); }}
Mi zovemo prikupiti() metoda na Kolektor klasa za guranje podataka naprijed u cjevovodu obrade.
Naš sljedeći i posljednji korak je grupiranje nabora po njihovim prvim elementima (riječima), a zatim izvođenje a iznos agregat na drugim elementima kako bi se dobio broj pojavljivanja riječi:
javni statički skup podataka startWordCount (ExecutionEnvironment env, Popis redaka) baca iznimku {DataSet text = env.fromCollection (linije); vrati tekst.flatMap (novi LineSplitter ()) .groupBy (0) .aggregate (Agregacije.SUM, 1); }
Koristimo tri vrste Flink transformacija: flatMap (), groupBy (), i agregat ().
Napišimo test kako bismo ustvrdili da implementacija broja riječi funkcionira kako se očekivalo:
Redovi popisa = Arrays.asList ("Ovo je prva rečenica", "Ovo je druga rečenica s jednom riječju"); Skup podataka rezultat = WordCount.startWordCount (env, retci); Popis collect = rezultat.collect (); assertThat (collect) .containsExactlyInAnyOrder (novi Tuple2 ("a", 3), novi Tuple2 ("rečenica", 2), novi Tuple2 ("riječ", 1), novi Tuple2 ("je", 2), novi Tuple2 ( "this", 2), new Tuple2 ("second", 1), new Tuple2 ("first", 1), new Tuple2 ("with", 1), new Tuple2 ("one", 1));
6. DataStream API
6.1. Stvaranje DataStreama
Apache Flink također podržava obradu tokova događaja putem svog DataStream API-ja. Ako želimo početi trošiti događaje, prvo moramo koristiti StreamExecutionEnvironment razred:
StreamExecutionEnvironment ExecuEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();
Dalje, možemo stvoriti tok događaja pomoću izvršenjeOkolina iz raznih izvora. To bi mogao biti neki sabirnik poruka Apache Kafka, ali u ovom ćemo primjeru jednostavno stvoriti izvor od nekoliko elemenata niza:
DataStream dataStream = ExecuEnvironment.fromElements ("Ovo je prva rečenica", "Ovo je druga rečenica s jednom riječju");
Možemo primijeniti transformacije na svaki element DataStream kao u normalu Skup podataka razred:
SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);
Da bismo pokrenuli izvršenje, moramo prizvati operaciju sudopera kao što je ispis () koji će samo ispisati rezultat transformacije na standardni izlaz, slijedeći s izvršiti() metoda na StreamExecutionEnvironment razred:
upperCase.print (); env.execute ();
Proizvest će sljedeći rezultat:
1> OVO JE PRVA REČENICA 2> OVO JE DRUGA REČENICA S JEDNOM RIJEČJU
6.2. Prozor događaja
Kada obrađujete tok događaja u stvarnom vremenu, ponekad ćete možda trebati grupirati događaje i primijeniti neke izračune na prozoru tih događaja.
Pretpostavimo da imamo tok događaja, gdje je svaki događaj par koji se sastoji od broja događaja i vremenske oznake kada je događaj poslan u naš sustav, te da možemo tolerirati događaje koji nisu u redu, ali samo ako nisu kasni više od dvadeset sekundi.
Za ovaj primjer, hajde da prvo stvorimo tok koji simulira dva događaja u razmaku od nekoliko minuta i definiramo ekstraktor vremenske oznake koji određuje naš prag zakašnjenja:
SingleOutputStreamOperator windowed = env.fromElements (novi Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), novi Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())). assignTimestampsAndWatermarks (novi BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});
Dalje, definirajmo operaciju prozora za grupiranje naših događaja u prozore od pet sekundi i primijenimo transformaciju na te događaje:
SingleOutputStreamOperator smanjena = windowed .windowAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); smanjen.print ();
Dobit će zadnji element svakog prozora od pet sekundi, pa se ispisuje:
1> (15,1491221519)
Imajte na umu da drugi događaj ne vidimo jer je stigao kasnije od navedenog praga zakašnjenja.
7. Zaključak
U ovom smo članku predstavili okvir Apache Flink i pogledali neke od transformacija isporučenih s njegovim API-jem.
Implementirali smo program za brojanje riječi koristeći Flinkov tečni i funkcionalni DataSet API. Zatim smo pogledali DataStream API i implementirali jednostavnu transformaciju u stvarnom vremenu na toku događaja.
Provedbu svih ovih primjera i isječaka koda možete pronaći na GitHubu - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.