Uvod u Kafka priključke

1. Pregled

Apache Kafka® distribuirana je streaming platforma. U prethodnom uputstvu razgovarali smo o tome kako implementirati potrošače i proizvođače Kafke koristeći Spring.

U ovom uputstvu naučit ćemo kako koristiti Kafka priključke.

Pogledati ćemo:

  • Različite vrste Kafka konektora
  • Značajke i načini rada Kafka Connect
  • Konfiguracija konektora pomoću datoteka svojstava, kao i REST API

2. Osnove Kafka Connect i Kafka Connectors

Kafka Connect je okvir za povezivanje Kafke s vanjskim sustavima kao što su baze podataka, pohrane ključeva i vrijednosti, indeksi pretraživanja i datotečni sustavi, koristeći tzv Konektori.

Kafka konektori su komponente spremne za upotrebu, koje nam mogu pomoći za uvoz podataka iz vanjskih sustava u Kafkine teme i izvoz podataka iz Kafkinih tema u vanjske sustave. Postojeće implementacije konektora možemo koristiti za uobičajene izvore podataka i sudopere ili implementirati vlastite konektore.

A konektor izvora prikuplja podatke iz sustava. Izvorni sustavi mogu biti čitave baze podataka, tablice tokova ili posrednici poruka. Izvorni konektor također bi mogao prikupiti mjerne podatke s aplikacijskih poslužitelja u Kafka teme, čineći podatke dostupnim za obradu toka s malom kašnjenjem.

A priključak za umivaonik isporučuje podatke iz Kafkinih tema u druge sustave, što bi mogli biti indeksi poput Elasticsearch, batch sustavi poput Hadoop ili bilo koja vrsta baze podataka.

Neke konektore održava zajednica, dok druge podržava Confluent ili njegovi partneri. Zaista, možemo pronaći konektore za najpopularnije sustave, poput S3, JDBC i Cassandre, samo da nabrojimo neke.

3. Značajke

Kafka Connect značajke uključuju:

  • Okvir za povezivanje vanjskih sustava s Kafkom - pojednostavljuje razvoj, postavljanje i upravljanje konektorima
  • Distribuirani i samostalni modusi - pomaže nam u raspoređivanju velikih klastera iskorištavanjem distribuirane prirode Kafke, kao i postavki za razvoj, testiranje i male proizvodne implementacije
  • REST sučelje - konektorima možemo upravljati pomoću REST API-ja
  • Automatsko upravljanje pomakom - Kafka Connect nam pomaže u rješavanju problema postupak kompenziranja, što nam štedi probleme ručne implementacije ovog dijela razvoja konektora sklonog pogreškama
  • Distribuirano i skalabilno prema zadanim postavkama - Kafka Connect koristi postojeći protokol za upravljanje grupama; možemo dodati više radnika za povećanje klastera Kafka Connect
  • Streaming i batch integracija - Kafka Connect idealno je rješenje za povezivanje sustava za streaming i batch podatke u vezi s postojećim Kafkinim mogućnostima
  • Transformacije - one nam omogućuju jednostavne i lagane modifikacije pojedinih poruka

4. Postavljanje

Umjesto da koristimo običnu Kafkinu distribuciju, preuzet ćemo Confluent Platform, Kafkinu distribuciju koju pruža Confluent, Inc., tvrtka koja stoji iza Kafke. Confluent Platform dolazi s nekim dodatnim alatima i klijentima, u usporedbi s običnim Kafkom, kao i nekim dodatnim unaprijed izgrađenim konektorima.

Za naš je slučaj dovoljno izdanje otvorenog koda koje se može naći na web mjestu Confluent.

5. Brzi početak Kafka Connect

Za početak ćemo razgovarati o principu Kafka Connect, koristeći svoje najosnovnije konektore, koji su datoteka izvor konektor i datoteka umivaonik konektor.

Povoljno, Confluent Platform dolazi s oba ova konektora, kao i referentne konfiguracije.

5.1. Konfiguracija izvornog konektora

Za izvorni konektor referentna konfiguracija dostupna je na $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

name = local-file-source connector.class = FileStreamSource zadaci.max = 1 topic = connect-test file = test.txt

Ova konfiguracija ima neka svojstva koja su zajednička za sve izvorne konektore:

  • Ime je korisničko ime koje je navedeno za instancu konektora
  • konektor.razred određuje izvedbenu klasu, u osnovi vrstu konektora
  • zadaci.max određuje koliko instanci našeg izvornog konektora treba raditi paralelno, i
  • tema definira temu na koju bi konektor trebao poslati izlaz

U ovom slučaju imamo i atribut specifičan za konektor:

  • datoteka definira datoteku iz koje konektor treba čitati ulaz

Da bi to onda uspjelo, stvorimo osnovnu datoteku s nekim sadržajem:

echo -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

Imajte na umu da je radni direktorij $ CONFLUENT_HOME.

5.2. Konfiguracija priključka za umivaonik

Za naš priključak za umivaonik upotrijebit ćemo referentnu konfiguraciju na $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

name = local-file-sink konektor.class = FileStreamSink zadaci.max = 1 datoteka = test.sink.txt topics = connect-test

Logično, sadrži potpuno iste parametre, doduše ovaj put konektor.razred određuje implementaciju konektora umivaonika i datoteka je mjesto na kojem bi konektor trebao upisati sadržaj.

5.3. Radnička konfiguracija

Konačno, moramo konfigurirati radnika Connect, koji će integrirati naša dva konektora i obaviti posao čitanja s izvornog konektora i zapisivanja na konektor sudopera.

Za to možemo koristiti $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java

Imajte na umu da dodatak.put može sadržavati popis staza, gdje su dostupne implementacije konektora

Kako ćemo koristiti konektore u paketu s Kafkom, možemo postaviti dodatak.put do $ CONFLUENT_HOME / share / java. Radeći sa sustavom Windows, ovdje bi moglo biti potrebno pružiti apsolutni put.

Za ostale parametre možemo ostaviti zadane vrijednosti:

  • bootstrap. poslužitelji sadrži adrese brokerskih kuća Kafka
  • ključ. pretvarač i vrijednost. pretvarač definirati klase pretvarača, koje serializiraju i deserializiraju podatke dok teku iz izvora u Kafku, a zatim iz Kafke u sudoper
  • key.converter.schems.enable i value.converter.schems.enable su postavke specifične za pretvarač
  • offset.storage.file.filename je najvažnija postavka prilikom pokretanja Connect-a u samostalnom načinu: definira gdje Connect treba pohraniti svoje offset podatke
  • offset.flush.interval.ms definira interval u kojem radnik pokušava izvršiti odstupanja za zadatke

A popis parametara prilično je zreo, pa potražite kompletnu listu u službenoj dokumentaciji.

5.4. Kafka Connect u samostalnom načinu rada

I time možemo započeti naše prvo postavljanje konektora:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. Svojstva

Prvo, možemo provjeriti sadržaj teme pomoću naredbenog retka:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --top connect-test --od početka

Kao što vidimo, izvorni konektor uzeo je podatke s test.txt datoteku, transformirao je u JSON i poslao je Kafki:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

Ako pogledamo mapu $ CONFLUENT_HOME, možemo vidjeti da je datoteka test.sink.txt stvoreno je ovdje:

mačka $ CONFLUENT_HOME / test.sink.txt foo bar

Kako konektor umivaonika izvlači vrijednost iz korisni teret atribut i zapisuje ga u odredišnu datoteku, podatke u test.sink.txt ima sadržaj izvornika test.txt datoteka.

Sad dodajmo još redaka u test.txt.

Kad to učinimo, vidimo da konektor izvora automatski otkriva ove promjene.

Moramo osigurati da na kraju umetnemo novu liniju, u suprotnom, izvorni konektor neće uzeti u obzir posljednji redak.

U ovom trenutku, zaustavimo postupak povezivanja jer ćemo pokrenuti Povezivanje distribuirani način u nekoliko redaka.

6. Connectov REST API

Do sada smo sve konfiguracije radili prosljeđivanjem datoteka svojstava putem naredbenog retka. Međutim, kako je Connect dizajniran za rad kao usluga, na raspolaganju je i REST API.

Prema zadanim postavkama dostupan je na // localhost: 8083. Nekoliko krajnjih točaka su:

  • GET / konektori - vraća popis sa svim konektorima u upotrebi
  • GET / konektori / {name} - vraća detalje o određenom konektoru
  • POST / konektori - stvara novi konektor; tijelo zahtjeva treba biti JSON objekt koji sadrži polje imena niza i polje konfiguracije objekta s konfiguracijskim parametrima konektora
  • GET / konektori / {ime} / status - vraća trenutni status konektora - uključujući da li je pokrenut, nije uspio ili pauziran - kojem radniku je dodijeljen, informacije o pogrešci ako nije uspjelo i stanje svih njegovih zadataka
  • IZBRIŠI / konektori / {name} - briše konektor, elegantno zaustavljajući sve zadatke i brišući njegovu konfiguraciju
  • GET / plug-plug - vraća popis dodataka konektora instaliranih u klasteru Kafka Connect

Službena dokumentacija sadrži popis svih krajnjih točaka.

API REST koristit ćemo za stvaranje novih konektora u sljedećem odjeljku.

7. Kafka Connect u distribuiranom načinu

Samostalni način rada savršeno radi za razvoj i testiranje, kao i za manje postavke. Međutim, ako želimo u potpunosti iskoristiti distribuiranu prirodu Kafke, moramo pokrenuti Connect u distribuiranom načinu.

Na taj se način postavke konektora i metapodaci pohranjuju u Kafka teme umjesto u datotečni sustav. Kao rezultat toga, radnički čvorovi stvarno nemaju apatrid.

7.1. Pokretanje programa Connect

Referentnu konfiguraciju za distribuirani način rada možete pronaći na $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Parametri su uglavnom isti kao i za samostalni način rada. Postoji samo nekoliko razlika:

  • grupa.id definira ime grupe klastera Connect. Vrijednost se mora razlikovati od bilo kojeg ID grupe potrošača
  • pomak.skladištenje.tema, config.storage.topic i status.skladištenje.tema definirajte teme za ove postavke. Za svaku temu možemo definirati i faktor replikacije

Opet, službena dokumentacija sadrži popis sa svim parametrima.

Povezivanje možemo započeti u distribuiranom načinu kako slijedi:

$ CONFLUENT_HOME / bin / povezivanje-distribucija $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. Dodavanje konektora pomoću REST API-ja

Sada, u usporedbi sa samostalnom naredbom za pokretanje, nismo proslijedili nijednu konfiguraciju konektora kao argumente. Umjesto toga, moramo stvoriti konektore koristeći REST API.

Da bismo postavili naš primjer od prije, moramo poslati dva POST zahtjeva // localhost: 8083 / konektori koji sadrži sljedeće JSON strukture.

Prvo, moramo stvoriti tijelo za izvorni konektor POST kao JSON datoteku. Evo, nazvat ćemo to connect-file-source.json:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-distribu.txt", "topic ":" povezivanje-distribucija "}}

Imajte na umu kako ovo izgleda prilično slično referentnoj konfiguracijskoj datoteci koju smo koristili prvi put.

A onda SMO OBJAVILI:

curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Zatim ćemo učiniti isto za priključak umivaonika, pozivajući datoteku connect-file-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "tasks.max": 1, "file": "test-distribu.sink.txt", "topics": "povezivanje-distribucija"}}

I izvedite POST kao prije:

curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Ako je potrebno, možemo provjeriti funkcionira li ispravno:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --top povezivanje-distribucija --od početka {"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

Ako pogledamo mapu $ CONFLUENT_HOME, možemo vidjeti da je datoteka test-distribuirano.sink.txt stvoreno je ovdje:

mačka $ CONFLUENT_HOME / test-distribuiran.sink.txt foo bar

Nakon što smo testirali distribuiranu postavku, počistimo uklanjanjem dva konektora:

curl -X DELETE // localhost: 8083 / konektori / local-file-source curl -X DELETE // localhost: 8083 / connectors / local-file-sink

8. Transformacija podataka

8.1. Podržane transformacije

Transformacije nam omogućuju jednostavne i lagane modifikacije pojedinih poruka.

Kafka Connect podržava sljedeće ugrađene transformacije:

  • InsertField - Dodajte polje koristeći statičke podatke ili zabilježite metapodatke
  • Zamijeni Polje - Filtrirajte ili preimenujte polja
  • MaskField - Zamijenite polje važećom null vrijednošću za tip (na primjer, nula ili prazan niz)
  • HoistField - Zamotajte cijeli događaj kao jedno polje unutar strukture ili mape
  • ExtractField - Izdvojite određeno polje iz struct i map i uključite samo ovo polje u rezultate
  • SetSchemaMetadata - Izmijenite naziv ili verziju sheme
  • TimestampRouter - Izmijenite temu zapisa na temelju izvorne teme i vremenske oznake
  • RegexRouter - Izmijenite temu zapisa na temelju izvorne teme, zamjenskog niza i regularnog izraza

Transformacija se konfigurira pomoću sljedećih parametara:

  • preobražava - Popis zamjenskih imena za transformacije odvojenih zarezom
  • pretvara. $ alias.type - Naziv klase za transformaciju
  • pretvara. $ alias. $ transformSpecificConfig - Konfiguracija za odgovarajuću transformaciju

8.2. Primjena transformatora

Da bismo testirali neke značajke transformacije, postavimo sljedeće dvije transformacije:

  • Prvo, umotajmo cijelu poruku u JSON strukturu
  • Nakon toga, dodamo polje toj strukturi

Prije primjene naših transformacija, moramo konfigurirati Connect da koristi JSON bez sheme, modificirajući spojiti-distribuirati.svojstva:

key.converter.schemas.enable = lažna vrijednost.converter.schemas.enable = false

Nakon toga moramo ponovno pokrenuti Connect, opet u distribuiranom načinu:

$ CONFLUENT_HOME / bin / povezivanje-distribucija $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

Opet, moramo stvoriti tijelo za izvorni konektor POST kao JSON datoteku. Evo, nazvat ćemo to connect-file-source-transform.json.

Uz već poznate parametre, dodajemo nekoliko redaka za dvije potrebne transformacije:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-transform.txt", "topic ":" connect-transform "," transformacije ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-file-source "}}

Nakon toga, izvedimo POST:

curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Napišimo nekoliko redaka našem test-transformacija.txt:

Foo Bar

Ako sada pregledamo povezivanje-preobrazba teme, trebali bismo dobiti sljedeće redove:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. Korištenje spremnih konektora

Nakon upotrebe ovih jednostavnih konektora, pogledajmo naprednije konektore koji su spremni za upotrebu i kako ih instalirati.

9.1. Gdje pronaći konektore

Unaprijed izrađeni konektori dostupni su iz različitih izvora:

  • Nekoliko je konektora u paketu s običnim Apache Kafkom (izvor i sudoper za datoteke i konzolu)
  • Još su neki konektori u paketu s Confluent Platform (ElasticSearch, HDFS, JDBC i AWS S3)
  • Također pogledajte Confluent Hub, svojevrsnu trgovinu aplikacija za Kafka konektore. Broj ponuđenih konektora kontinuirano raste:
    • Spojni konektori (razvijeni, testirani, dokumentirani i u potpunosti podržani od strane Confluent)
    • Certificirani konektori (implementirao treća strana i certificirao Confluent)
    • Konektori razvijeni i podržani u zajednici
  • Osim toga, Confluent također nudi stranicu konektora, s nekim konektorima koji su također dostupni u Confluent Hub, ali i s još nekim konektorima zajednice.
  • I na kraju, postoje i dobavljači koji osiguravaju konektore kao dio svog proizvoda. Na primjer, Landoop nudi biblioteku za strujanje nazvanu Objektivi, koja također sadrži skup od ~ 25 konektora otvorenog koda (mnogi od njih također su popisani na drugim mjestima)

9.2. Instaliranje konektora iz koncentriranog čvorišta

Poduzeća verzija Confluent pruža skriptu za instalaciju konektora i drugih komponenata iz Confluent Hub-a (skripta nije uključena u verziju otvorenog koda). Ako koristimo verziju poduzeća, konektor možemo instalirati pomoću sljedeće naredbe:

$ CONFLUENT_HOME / bin / confluent-hub instaliranje confluentinc / kafka-connect-mqtt: 1.0.0-pregled

9.3. Ručna instalacija konektora

Ako trebamo konektor, koji nije dostupan na Confluent Hub ili ako imamo otvorenu verziju Confluent, tražene konektore možemo instalirati ručno. Za to moramo preuzeti i raspakirati konektor, kao i premjestiti uključene jezičke u mapu navedenu kao dodatak.put.

Za svaki priključak arhiva bi trebala sadržavati dvije nama zanimljive mape:

  • The lib mapa sadrži jar konektora, na primjer, kafka-connect-mqtt-1.0.0-preview.jar, kao i još neke staklenke potrebne konektoru
  • The itd mapa sadrži jednu ili više referentnih konfiguracijskih datoteka

Moramo premjestiti lib mapa u $ CONFLUENT_HOME / share / java, ili koji god put da smo odredili kao dodatak.put u connect-standalone.properties i spojiti-distribuirati.svojstva. Pritom bi možda imalo smisla i mapu preimenovati u nešto smisleno.

Konfiguracijske datoteke možemo koristiti iz itd bilo upućivanjem na njih tijekom pokretanja u samostalnom načinu, ili možemo jednostavno zgrabiti svojstva i iz njih stvoriti JSON datoteku.

10. Zaključak

U ovom smo tutorijalu pogledali kako instalirati i koristiti Kafka Connect.

Razmotrili smo vrste konektora, kako izvora tako i sudopera. Također smo pogledali neke značajke i načine u kojima Connect može raditi. Zatim smo pregledali transformatore. I na kraju, naučili smo gdje nabaviti i kako instalirati prilagođene konektore.

Kao i uvijek, konfiguracijske datoteke mogu se naći na GitHubu.