Apache Spark: Razlike između okvira podataka, skupova podataka i RDD-ova

1. Pregled

Apache Spark je brz, distribuirani sustav za obradu podataka. Obavlja obradu podataka u memoriji i koristi predmemoriranje u memoriji i optimizirano izvršavanje što rezultira brzim performansama. Pruža API-je na visokoj razini za popularne programske jezike kao što su Scala, Python, Java i R.

U ovom brzom vodiču proći ćemo kroz tri osnovna pojma Spark: okviri podataka, skupovi podataka i RDD-ovi.

2. DataFrame

Spark SQL je od Sparka 1.3 predstavio tabličnu apstrakciju podataka nazvanu DataFrame. Od tada je to postalo jedno od najvažnijih obilježja u Sparku. Ovaj je API koristan kada želimo rukovati strukturiranim i polustrukturiranim, distribuiranim podacima.

U odjeljku 3 razgovarat ćemo o elastičnim distribuiranim skupovima podataka (RDD). DataFrames podatke pohranjuju na učinkovitiji način od RDD-a, jer oni koriste nepromjenjive, memorijske, elastične, distribuirane i paralelne mogućnosti RDD-ova, ali također primjenjuju shemu na podatke. DataFrames također prevode SQL kôd u optimizirane RDD operacije na niskoj razini.

Okvire podataka možemo stvoriti na tri načina:

  • Pretvaranje postojećih RDD-ova
  • Pokretanje SQL upita
  • Učitavanje vanjskih podataka

Predstavio se Spark tim SparkSession u verziji 2.0 objedinjuje sve različite kontekste osiguravajući da programeri neće trebati brinuti o stvaranju različitih konteksta:

SparkSession session = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

Analizirat ćemo Turistička.csv datoteka:

Podaci skupa podataka = dataFrameReader.option ("header", "true") .csv ("data / Tourist.csv");

Otkako je Spark 2.0 DataFrame postao Skup podataka tipa Red, pa možemo koristiti DataFrame kao alias za Skup podataka.

Možemo odabrati određene stupce koji nas zanimaju. Također možemo filtrirati i grupirati prema danom stupcu:

data.select (col ("zemlja"), col ("godina"), col ("vrijednost")). show (); data.filter (col ("zemlja"). jednakTo ("Meksiko")) .show (); data.groupBy (col ("zemlja")) .count () .show ();

3. Skupovi podataka

Skup podataka je skup strukturiranih podataka s jakim tipom. Pružaju poznati objektno orijentirani stil programiranja, plus prednosti sigurnosti tipa, jer skupovi podataka mogu provjeravati sintaksu i hvatati pogreške u vrijeme sastavljanja.

Skup podataka je produžetak DataFramea, stoga DataFrame možemo smatrati netipiziranim prikazom skupa podataka.

Spark tim je objavio Skup podataka API u Sparku 1.6 i kao što su spomenuli: „cilj Spark Datasetsa je pružiti API koji omogućava korisnicima da lako izraze transformacije na objektnim domenama, istovremeno pružajući prednosti izvedbe i robusnosti izvršnog stroja Spark SQL“.

Prvo ćemo morati stvoriti klasu tipa Turistički podaci:

javna klasa TouristData {privatna gudačka regija; privatna gudačka zemlja; privatna gudačka godina; privatna gudačka serija; privatna dvostruka vrijednost; fusnote u privatnom nizu; privatni izvor niza; // ... geteri i postavljači}

Za mapiranje svakog od naših zapisa na navedenu vrstu trebat ćemo upotrijebiti enkoder. Koderi prevode između Java objekata i Sparkovog unutarnjeg binarnog formata:

// SparkSession inicijalizacija i učitavanje podataka Dataset responseWithSelectedColumns = data.select (col ("regija"), col ("država"), col ("godina"), col ("serija"), col ("vrijednost"). ("double"), col ("fusnote"), col ("izvor")); Skup podataka typedDataset = responseWithSelectedColumns .as (Encoders.bean (TouristData.class));

Kao i kod DataFrame-a, možemo filtrirati i grupirati prema određenim stupcima:

typedDataset.filter (((FilterFunction) zapis -> record.getCountry () .equals ("Norveška")). show (); typedDataset.groupBy (typedDataset.col ("zemlja")) .count () .show ();

Možemo raditi i operacije poput filtriranja po stupcu koji odgovara određenom rasponu ili izračunavanju zbroja određenog stupca kako bismo dobili ukupnu vrijednost:

typedDataset.filter (((FilterFunction) zapis -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contens ("rashod")) .groupBy ("country") .agg (sum ("value")) .show ();

4. RDD-ovi

Elastični distribuirani skup podataka ili RDD Sparkova je primarna apstrakcija programiranja. Predstavlja kolekciju elemenata koja je: nepromjenjiv, elastičan i distribuiran.

RDD enkapsulira veliki skup podataka, Spark će automatski distribuirati podatke sadržane u RDD-ima po našem klasteru i paralelizirati operacije koje na njima izvodimo.

RDD-ove možemo stvoriti samo operacijama podataka u stabilnoj pohrani ili operacijama na drugim RDD-ima.

Tolerancija grešaka bitna je kada imamo posla s velikim skupom podataka i ako se podaci distribuiraju na klaster strojevima. RDD-ovi su elastični zbog Sparkove ugrađene mehanike oporavka od kvarova. Spark se oslanja na činjenicu da RDD-ovi pamte kako su stvoreni, tako da možemo lako vratiti lozu da vratimo particiju.

Dvije su vrste operacija koje možemo raditi na RDD-ima: Transformacije i akcije.

4.1. Transformacije

Možemo primijeniti transformacije na RDD za manipulaciju njegovim podacima. Nakon što se izvede ova manipulacija, dobit ćemo potpuno novi RDD, jer su RDD nepromjenjivi objekti.

Provjerit ćemo kako implementirati Map and Filter, dvije najčešće transformacije.

Prvo, moramo stvoriti a JavaSparkContext i učitati podatke kao RDD iz Turistička.csv datoteka:

SparkConf conf = new SparkConf (). SetAppName ("uppercaseCountries") .setMaster ("local [*]"); JavaSparkContext sc = novi JavaSparkContext (conf); JavaRDD turisti = sc.textFile ("data / Tourist.csv");

Dalje, primijenimo funkciju karte kako bismo iz svakog zapisa dobili naziv države i pretvorili ime u velika slova. Ovaj novo generirani skup podataka možemo spremiti kao tekstualnu datoteku na disk:

JavaRDD upperCaseCountries = tourist.map (redak -> {Niz [] stupci = linija.split (COMMA_DELIMITER); vratiti stupce [1] .toUpperCase ();}). Razlikovni (); upperCaseCountries.saveAsTextFile ("podaci / izlaz / velika slova.txt");

Ako želimo odabrati samo određenu državu, možemo primijeniti funkciju filtra na izvornom RDD turistu:

JavaRDD turistiInMexico = turisti .filter (linija -> linija.split (COMMA_DELIMITER) [1] .equals ("Meksiko")); touristInMexico.saveAsTextFile ("data / output / touristInMexico.txt");

4.2. Akcije

Akcije će vratiti konačnu vrijednost ili spremiti rezultate na disk, nakon izvršenih određenih izračuna na podacima.

Dvije od ponavljajuće korištenih radnji u Spark-u su Count and Reduce.

Izbrojimo ukupan broj zemalja u našoj CSV datoteci:

// Iskrena kontekstualna inicijalizacija i učitavanje podataka JavaRDD countries = tourist.map (line -> {String [] columns = line.split (COMMA_DELIMITER); return columns [1];}). Long numberOfCountries = countries.count ();

Sada ćemo izračunati ukupne troškove po zemljama. Trebat ćemo filtrirati zapise koji u njihovom opisu sadrže troškove.

Umjesto da koristite JavaRDD, koristit ćemo a JavaPairRDD. Par RDD je vrsta RDD-a koji može pohraniti parove ključ / vrijednost. Provjerimo sljedeće:

JavaRDD turistiExpenditure = turisti .filter (linija -> linija.split (COMMA_DELIMITER) [3] .contain ("rashod")); JavaPairRDD rashodPairRdd = touristExpenditure .mapToPair (line -> {String [] stupci = line.split (COMMA_DELIMITER); vrati novi Tuple2 (stupci [1], Double.valueOf (stupci [6]));}); Popis totalByCountry = rashodPairRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. Zaključak

Da sumiramo, trebali bismo koristiti okvire podataka ili skupove podataka kada su nam potrebni API-ji specifični za domenu, trebaju nam izrazi na visokoj razini poput agregacije, zbroja ili SQL upita. Ili kada želimo sigurnost tipova u vrijeme sastavljanja.

S druge strane, trebali bismo koristiti RDD-ove kada su podaci nestrukturirani i ne trebamo implementirati određenu shemu ili kada trebamo transformacije i radnje na niskoj razini.

Kao i uvijek, svi uzorci koda dostupni su na GitHub-u.