ETL s Spring Cloud protokom podataka

1. Pregled

Spring Cloud Data Flow je izvorni alat za oblak za izgradnju cjevovoda podataka i batch procesa u stvarnom vremenu. Spring Cloud Data Flow spreman je za upotrebu za niz slučajeva korištenja obrade podataka, poput jednostavnog uvoza / izvoza, ETL obrade, strujanja događaja i prediktivne analitike.

U ovom uputstvu naučit ćemo primjer pretvorbe i učitavanja ekstrakta i učitavanja (ETL) u stvarnom vremenu pomoću protočnog cjevovoda koji izvlači podatke iz JDBC baze podataka, transformira ih u jednostavne POJO-ove i učitava u MongoDB.

2. Obrada ETL-a i prijenosa događaja

ETL - izdvajanje, transformacija i učitavanje - obično se nazivao postupkom koji skupno učitava podatke iz nekoliko baza podataka i sustava u zajedničko skladište podataka. U ovom skladištu podataka moguće je izvršiti tešku obradu analize podataka bez ugrožavanja ukupnih performansi sustava.

Međutim, novi trendovi mijenjaju način na koji se to radi. ETL još uvijek ima ulogu u prijenosu podataka u skladišta podataka i na jezera podataka.

Danas se to može učiniti s struji u arhitekturi događaja događaja uz pomoć Spring Cloud Data Flow.

3. Proljetni protok podataka u oblaku

Uz Spring Cloud Data Flow (SCDF), programeri mogu stvoriti cjevovode podataka u dva okusa:

  • Dugovječne streamove aplikacije u stvarnom vremenu koje koriste Spring Cloud Stream
  • Kratkotrajne skupne aplikacije zadataka koje koriste Spring Cloud Task

U ovom ćemo članku pokriti prvu, dugovječnu streaming aplikaciju koja se temelji na Spring Cloud Streamu.

3.1. Proljetni Cloud Stream programi

Cjevovodi SCDF Stream sastoje se od koraka, gdjesvaki je korak aplikacija izgrađena u stilu Spring Boot koristeći mikrookvir Spring Spring Stream. Te su aplikacije integrirane međuopskrbnim porukama poput Apache Kafka ili RabbitMQ.

Te su aplikacije klasificirane u izvore, procesore i sudopere. U usporedbi s ETL postupkom, mogli bismo reći da je izvor "ekstrakt", procesor "transformator", a umivaonik dio "opterećenja".

U nekim slučajevima možemo koristiti pokretač aplikacije u jednom ili više koraka cjevovoda. To znači da za korak ne bismo trebali implementirati novi program, već umjesto toga konfigurirati postojeći pokretač programa koji je već implementiran.

Popis početnika aplikacija možete pronaći ovdje.

3.2. Spring Cloud protok podataka poslužitelj

Posljednji dio arhitekture je Spring Cloud Data Flow Server. SCDF poslužitelj vrši postavljanje aplikacija i protoka cjevovoda pomoću specifikacije Spring Cloud Deployer. Ova specifikacija podržava SCDF-ov izvorni okus u oblaku postavljanjem na niz modernih vremena izvođenja, poput Kubernetesa, Apache Mesosa, pređe i Ljevaonice oblaka.

Također, stream možemo pokrenuti i kao lokalnu implementaciju.

Više informacija o arhitekturi SCDF možete pronaći ovdje.

4. Postavljanje okoliša

Prije nego što započnemo, moramo odaberite dijelove ove složene implementacije. Prvi dio koji treba definirati je SCDF poslužitelj.

Za testiranje, koristit ćemo SCDF Server Local za lokalni razvoj. Za produkcijsku implementaciju kasnije možemo odabrati izvorno vrijeme izvođenja u oblaku, poput SCDF poslužitelja Kubernetes. Popis vremena rada poslužitelja možemo pronaći ovdje.

Sada, provjerimo sistemske zahtjeve za pokretanje ovog poslužitelja.

4.1. Zahtjevi sustava

Da bismo pokrenuli SCDF poslužitelj, morat ćemo definirati i postaviti dvije ovisnosti:

  • posrednički softver za razmjenu poruka i
  • RDBMS.

Za posrednički softver za razmjenu poruka, radit ćemo s RabbitMQ-om i odabiremo PostgreSQL kao RDBMS za pohranu naših definicija struje cjevovoda.

Za pokretanje RabbitMQ ovdje preuzmite najnoviju verziju i pokrenite RabbitMQ instancu pomoću zadane konfiguracije ili pokrenite sljedeću naredbu Docker:

izvršavanje dockera --ime protok podataka-zec -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-upravljanje

Kao posljednji korak postavljanja, instalirajte i pokrenite PostgreSQL RDBMS na zadanom portu 5432. Nakon toga stvorite bazu podataka u koju SCDF može pohraniti svoje definicije toka pomoću sljedeće skripte:

STVARI protok podataka;

4.2. Lokalni poslužitelj protoka podataka Spring Cloud

Za pokretanje lokalnog SCDF poslužitelja možemo odabrati pokretanje poslužitelja pomoću docker-compose, ili ga možemo pokrenuti kao Java program.

Ovdje ćemo pokrenuti SCDF Server Local kao Java aplikaciju. Za konfiguriranje aplikacije moramo definirati konfiguraciju kao parametre Java aplikacije. Trebat će nam Java 8 na putu sustava.

Da bismo ugostili staklenke i ovisnosti, moramo stvoriti matičnu mapu za naš SCDF poslužitelj i u nju preuzeti lokalnu distribuciju SCDF poslužitelja. Najnoviju distribuciju lokalnog SCDF poslužitelja možete preuzeti ovdje.

Također, moramo stvoriti mapu lib i tamo staviti JDBC upravljački program. Najnovija verzija upravljačkog programa PostgreSQL dostupna je ovdje.

Konačno, pokrenimo lokalni poslužitelj SCDF:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = gost \ --spring.rabbitmq.password = gost

Možemo li provjeriti radi li gledajući ovaj URL:

// localhost: 9393 / nadzorna ploča

4.3. Proljetna školjka protoka podataka u oblaku

SCDF ljuska je alat naredbenog retka koji olakšava sastavljanje i postavljanje naših aplikacija i cjevovoda. Ove naredbe ljuske prelaze preko REST API-ja poslužitelja Spring Cloud Data Flow.

Preuzmite najnoviju verziju staklenke u svoju kućnu mapu SCDF koja je dostupna ovdje. Po završetku pokrenite sljedeću naredbu (ažurirajte verziju po potrebi):

$ java -jar spring-cloud-cloud-flow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | '_ \ | '__ | | '_ \ / _` | | | | | / _ \ | | | | / _` | ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _` | __ / _` | | | _ | | / _ \ \ / \ / / \ \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / | ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / Dobrodošli u ljuska Spring Cloud Data Flow. Za pomoć pritisnite TAB ili upišite "help". protok podataka:>

Ako umjesto „protok podataka:> " dobivate "nepoznat poslužitelj:> ” u posljednjem retku ne koristite SCDF poslužitelj na localhostu. U tom slučaju pokrenite sljedeću naredbu za povezivanje s drugim hostom:

nepoznat poslužitelj:> dataflow config server // {host}

Sada je Shell povezan sa SCDF poslužiteljem i možemo pokretati svoje naredbe.

Prvo što moramo učiniti u Shell-u je uvoziti početne programe. Ovdje pronađite najnoviju verziju za RabbitMQ + Maven u Spring Boot 2.0.x i pokrenite sljedeću naredbu (ponovno ažurirajte verziju ovdje “Darwin-SR1", po potrebi):

$ dataflow:> import app --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Za provjeru instaliranih aplikacija pokrenite sljedeću naredbu Shell:

$ dataflow:> popis aplikacija

Kao rezultat, trebali bismo vidjeti tablicu koja sadrži sve instalirane aplikacije.

Također, SCDF nudi grafičko sučelje, nazvano Flo, kojima možemo pristupiti putem ove adrese: // localhost: 9393 / nadzorna ploča. Međutim, njegova uporaba nije u dosegu ovog članka.

5. Sastavljanje ETL cjevovoda

Izradimo sada naš protočni tok. Za to ćemo upotrijebiti pokretač aplikacije JDBC Source za izdvajanje podataka iz naše relacijske baze podataka.

Također ćemo stvoriti prilagođeni procesor za transformiranje informacijske strukture i prilagođeni sudoper za učitavanje naših podataka u MongoDB.

5.1. Ekstrakt - Priprema relacijske baze podataka za ekstrakciju

Stvorimo bazu podataka s imenom crm i tablica s imenom kupac:

STVARI BAZU PODATAKA crm;
STVORI TABELU kupac (id bigint NIJE NULL, uvezena logička vrijednost DEFAULT false, znakovi_naznaka_kupac variraju (50), PRIMARNI KLJUČ (id))

Imajte na umu da koristimo zastavicu uvozni, koji će pohraniti koji je zapis već uvezen. Te bismo podatke također mogli pohraniti u drugu tablicu, ako je potrebno.

Sad, ubacimo neke podatke:

INSERT INTO customer (id, customer_name, import) VRIJEDNOSTI (1, 'John Doe', false);

5.2. Transformacija - mapiranje JDBC Polja do MongoDB Struktura polja

Za korak transformacije napravit ćemo jednostavan prijevod polja Ime kupca iz izvorne tablice u novo polje Ime. Ovdje bi se mogle izvesti i druge transformacije, ali neka primjer bude kratak.

Da bismo to učinili, stvorit ćemo novi projekt s imenom kupac-transformirati. Najlakši način za to je korištenje web stranice Spring Initializr za izradu projekta. Nakon dolaska na web mjesto odaberite grupu i naziv artefakta. Koristit ćemo com.kupac i preobrazba kupca, odnosno.

Kada je to gotovo, kliknite gumb "Generiraj projekt" da biste preuzeli projekt. Zatim otpakirajte projekt i uvezite ga u svoj omiljeni IDE te dodajte sljedeću ovisnost u pom.xml:

 org.springframework.cloud proljeće-oblak-tok-vezivo-zec 

Sada smo spremni započeti s kodiranjem pretvorbe imena polja. Da bismo to učinili, stvorit ćemo Kupac klase da djeluje kao adapter. Ovaj će razred dobiti Ime kupca putem setName () metodu i prikazat će njezinu vrijednost putem getName metoda.

The @JsonProperty napomene će izvršiti transformaciju tijekom deserializacije iz JSON-a u Javu:

kupac javne klase {private Long id; privatni naziv niza; @JsonProperty ("customer_name") javna void setName (Ime niza) {this.name = name; } @JsonProperty ("name") javni niz getName () {return name; } // Dobavljači i postavljači}

Procesor treba primiti podatke s ulaza, izvršiti transformaciju i povezati ishod s izlaznim kanalom. Stvorimo klasu koja će to učiniti:

uvoz org.springframework.cloud.stream.annotation.EnableBinding; uvoz org.springframework.cloud.stream.messaging.Processor; uvoz org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) javna klasa CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) javni kupac convertToPojo (korisnička nosivost) {return payload; }}

U gornjem kodu možemo primijetiti da se transformacija događa automatski. Ulaz prima podatke kao JSON i Jackson ih deserializira u Kupac objekt pomoću postavljen metode.

Suprotno je za izlaz, podaci su serializirani u JSON pomoću dobiti metode.

5.3. Tovar - Sudoper u MongoDB-u

Slično koraku transformacije, stvorit ćemo još jedan maven projekt, sada s imenom kupac-mongodb-umivaonik. Ponovo pristupite Proljetnom Initializru, za odabir Grupe com.kupac, a za Artefakt odaberite kupac-mongodb-sudoper. Zatim upišite MongoDB u okvir za pretraživanje ovisnosti i preuzmite projekt.

Zatim otpakirajte i uvezite u svoj omiljeni IDE.

Zatim dodajte istu dodatnu ovisnost kao u kupac-transformirati projekt.

Sad ćemo stvoriti još jedan Kupac klase, za primanje unosa u ovom koraku:

uvoz org.springframework.data.mongodb.core.mapping.Document; @Document (collection = "kupac") javna klasa Kupac {private Long id; privatni naziv niza; // Dobavljači i postavljači}

Za potapanje Kupac, stvorit ćemo klasu slušatelja koja će spasiti entitet kupca pomoću Repozitorij kupaca:

@EnableBinding (Sink.class) javna klasa CustomerListener {@Autowired privatno spremište CustomerRepository; @StreamListener (Sink.INPUT) javno prazno spremanje (kupac kupac) {repository.save (kupac); }}

I Repozitorij kupaca, u ovom slučaju je a MongoRepository iz proljetnih podataka:

uvoz org.springframework.data.mongodb.repository.MongoRepository; uvoz org.springframework.stereotype.Repository; Javno sučelje @Repository CustomerRepository proširuje MongoRepository {} 

5.4. Definicija streama

Sada, obje prilagođene aplikacije spremne su za registraciju na SCDF poslužitelju. Da biste to postigli, kompajlirajte oba projekta pomoću naredbe Maven mvn instalirati.

Zatim ih registriramo pomoću opruge Spring Cloud Flow Flow:

registar aplikacija --ime customer-transform --type processor --uri maven: //com.customer: customer-transform: 0.0.1-SNAPSHOT
registar aplikacija --ime kupac-mongodb-sudoper --tip sudopera --uri maven: //com.customer: kupac-mongodb-sudoper: jar: 0.0.1-SNAPSHOT

Na kraju, provjerimo jesu li aplikacije pohranjene na SCDF-u, pokrenimo naredbu popisa aplikacija u ljusci:

popis aplikacija

Kao rezultat, trebali bismo vidjeti obje aplikacije u rezultirajućoj tablici.

5.4.1. Jezik specifičan za domenu strujnog cjevovoda - DSL

DSL definira konfiguraciju i protok podataka između aplikacija. SCDF DSL je jednostavan. U prvoj riječi definiramo naziv aplikacije, a zatim konfiguracije.

Također, sintaksa je sintaksa Pipeline inspirirana Unixom, koja koristi okomite trake, poznate i kao "cijevi", za povezivanje više aplikacija:

http --port = 8181 | zapisnik

Ovo stvara HTTP aplikaciju koja se poslužuje na portu 8181 koja u zapisnik šalje svaki primljeni nosivost tijela.

Sada, pogledajmo kako stvoriti definiciju DSL toka JDBC izvora.

5.4.2. Definicija izvora JDBC izvora

Ključne konfiguracije za JDBC Izvor su upit i ažuriranje.upit odabrat će nepročitane zapise dok ažuriranje promijenit će zastavicu kako bi spriječio ponovno čitanje trenutnih zapisa.

Također, definirat ćemo izvor JDBC za anketiranje u fiksnom kašnjenju od 30 sekundi i anketiranje maksimalno 1000 redaka. Na kraju ćemo definirati konfiguracije veze, poput upravljačkog programa, korisničkog imena, lozinke i URL veze:

jdbc --query = 'ODABERI id, ime_korisnika IZ public.customer WHERE import = false' --update = 'AŽURIRANJE public.customer SET import = true WHERE id in (: id)' --max-redovi-po-anketi = 1000 --fixed-delay = 30 --time-unit = SECONDS --driver-class-name = org.postgresql.Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - lozinka = postgres

Više svojstava JDBC izvorne konfiguracije možete pronaći ovdje.

5.4.3. Definicija toka sudopera kupca MongoDB

Kako nismo definirali konfiguracije veze u primjena.svojstva od kupac-mongodb-sudoper, konfigurirat ćemo putem DSL parametara.

Naša se aplikacija u potpunosti temelji na MongoDataAutoConfiguration. Ostale moguće konfiguracije možete pogledati ovdje. U osnovi, definirat ćemo proljeće.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. Stvorite i postavite tok

Prvo, da biste stvorili konačnu definiciju toka, vratite se u Shell i izvršite sljedeću naredbu (bez prijeloma reda, upravo su umetnuti radi čitljivosti):

stream create --name jdbc-to-mongodb --definition "jdbc --query = 'SELECT id, customer_name FROM public.customer WHERE import = false' --fixed-delay = 30 --max-redovi-po-anketi = 1000 --update = 'AŽURIRANJE KOMPONENTA kupca uvezeno = točno GDJE id u (: id)' --time-unit = SECONDS --password = postgres --driver-class-name = org.postgresql.Driver --username = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

Ovaj stream DSL definira tok pod nazivom jdbc-do-mongodb. Sljedeći, rasporedit ćemo stream pod njegovim imenom:

stream implementacija --name jdbc-to-mongodb 

Napokon, trebali bismo vidjeti mjesta svih dostupnih dnevnika u izlazu dnevnika:

Zapisnici će biti u {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Dnevnici će biti u {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongodb /jdbc-to-mongodb.customer-transform Dnevnici će biti u {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Zaključak

U ovom smo članku vidjeli cjelovit primjer ETL podatkovnog cjevovoda koji koristi Spring Cloud Data Flow.

Ono što je najvažnije, vidjeli smo konfiguracije pokretača aplikacija, stvorili ETL cjevovod toka pomoću Spring Cloud Shell protoka podataka i implementirali prilagođene aplikacije za naše čitanje, transformiranje i pisanje podataka.

Kao i uvijek, primjer koda može se naći u projektu GitHub.


$config[zx-auto] not found$config[zx-overlay] not found