Uvod u KafkaStreams na Javi

1. Pregled

U ovom ćemo članku pogledati KafkaStreams knjižnica.

KafkaStreams dizajnirali su tvorci Apache Kafke. Primarni cilj ovog dijela softvera je omogućiti programerima da stvore učinkovite, streaming programe u stvarnom vremenu koji bi mogli raditi kao mikroservisi.

KafkaStreams omogućuje nam da koristimo teme iz Kafke, analiziramo ili transformiramo podatke i potencijalno ih šaljemo drugoj temi Kafke.

Demonstrirati Kafka Streams, stvorit ćemo jednostavnu aplikaciju koja čita rečenice iz teme, broji pojavljivanja riječi i ispisuje broj po riječi.

Važno je napomenuti da KafkaStreams knjižnica nije reaktivna i nema podršku za asinkronizaciju i rukovanje povratnim tlakom.

2. Ovisnost Mavena

Za početak pisanja logike obrade toka pomoću Kafka Streamovi, moramo dodati ovisnost o kafka-potoci i kafka-klijenti:

 org.apache.kafka kafka-streams 1.0.0 org.apache.kafka kafka-klijenti 1.0.0 

Također trebamo instalirati i pokrenuti Apache Kafka, jer ćemo koristiti temu Kafke. Ova će tema biti izvor podataka za naš posao strujanja.

Kafku i ostale potrebne ovisnosti možemo preuzeti sa službenog web mjesta.

3. Konfiguriranje ulaza KafkaStreams

Prvo što ćemo napraviti je definicija ulazne Kafkine teme.

Možemo koristiti Slivno alat koji smo preuzeli - sadrži Kafka poslužitelj. Sadrži i kafka-proizvođač konzola koje možemo koristiti za objavljivanje poruka Kafki.

Za početak pokrenimo naš Kafka klaster:

./tekući start

Jednom kada Kafka započne, pomoću njega možemo definirati izvor podataka i naziv svoje aplikacije APPLICATION_ID_CONFIG:

String inputTopic = "inputTopic";
Svojstva streamsConfiguration = new Svojstva (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Ključni konfiguracijski parametar je BOOTSTRAP_SERVER_CONFIG. Ovo je URL naše lokalne Kafka instance koju smo upravo započeli:

private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Dalje, moramo proslijediti vrstu ključa i vrijednost poruka iz kojih ćemo se konzumirati inputTopic:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

Obrada streama često je značajna. Kada želimo spremiti srednje rezultate, moramo navesti STATE_DIR_CONFIG parametar.

U našem testu koristimo lokalni datotečni sustav:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. Izgradnja topologije strujanja

Nakon što definiramo svoju ulaznu temu, možemo stvoriti topologiju strujanja - to je definicija načina na koji događaji trebaju biti obrađeni i transformirani.

U našem primjeru željeli bismo implementirati brojač riječi. Za svaku rečenicu poslanu na inputTopic, želimo ga podijeliti na riječi i izračunati pojavu svake riječi.

Možemo upotrijebiti primjerak KStreamsBuilder klase za početak konstrukcije naše topologije:

Graditelj KStreamBuilder = novi KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Uzorak uzorka = Pattern.compile ("\ W +", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(value -> Arrays.asList (pattern.split (value.toLowerCase ()))) .groupBy ((key, word) -> word) .count ();

Da bismo implementirali brojanje riječi, prvo moramo podijeliti vrijednosti pomoću regularnog izraza.

Metoda split vraća niz. Koristimo flatMapValues ​​() da se poravna. Inače bismo završili s popisom nizova i bilo bi nezgodno pisati kod koristeći takvu strukturu.

Napokon, agregiramo vrijednosti za svaku riječ i pozivamo računati() koji će izračunati pojave određene riječi.

5. Rukovanje rezultatima

Već smo izračunali broj riječi naših ulaznih poruka. A sada, ispišite rezultate na standardnom izlazu pomoću za svakoga() metoda:

wordCounts .foreach ((w, c) -> System.out.println ("riječ:" + w + "->" + c));

U produkciji, često takav posao strujanja može objaviti izlaz na drugu Kafkinu temu.

To bismo mogli učiniti pomoću to () metoda:

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

The Serde klasa daje nam unaprijed konfigurirane serializatore za Java tipove koji će se koristiti za serializaciju objekata na niz bajtova. Niz bajtova tada će biti poslan na temu Kafka.

Koristimo Niz kao ključ naše teme i Dugo kao vrijednost za stvarni broj. The do() metoda spremit će rezultirajuće podatke u outputTopic.

6. Pokretanje KafkaStream posla

Do ovog trenutka izgradili smo topologiju koja se može izvršiti. Međutim, posao još nije započeo.

Svoj posao moramo započeti izričito pozivom na početak() metoda na KafkaStreams primjer:

KafkaStreams streamovi = novi KafkaStreams (graditelj, streamsConfiguration); streams.start (); Navoj.spavanje (30000); potoci.close ();

Imajte na umu da čekamo 30 sekundi da se posao završi. U stvarnom bi scenariju taj posao bio neprestano pokrenut, obrađujući događaje iz Kafke čim stignu.

Svoj posao možemo testirati objavljivanjem nekih događaja na našu temu Kafka.

Počnimo s kafka-proizvođač konzola i ručno pošaljite neke događaje na naš inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost: 9092> "ovo je poni"> "ovo je konj i poni" 

Ovim smo putem objavili dva događaja za Kafku. Naša će aplikacija potrošiti te događaje i ispisat će sljedeće rezultate:

riječ: -> 1 riječ: ovo -> 1 riječ: je -> 1 riječ: a -> 1 riječ: pony -> 1 riječ: -> 2 riječ: ovo -> 2 riječ: je -> 2 riječ: a - > 2 riječi: konj -> 1 riječ: i -> 1 riječ: poni -> 2

To možemo vidjeti kad je stigla prva poruka, riječ poni dogodilo samo jednom. Ali kad smo poslali drugu poruku, riječ poni dogodilo se po drugi put tiskanje: “riječ: poni -> 2 ″.

6. Zaključak

Ovaj članak govori o tome kako stvoriti primarnu aplikaciju za obradu toka koristeći Apache Kafka kao izvor podataka i KafkaStreams knjižnica kao knjižnica obrade toka.

Svi ovi primjeri i isječci koda mogu se naći u projektu GitHub - ovo je Maven projekt, pa bi ga trebalo lako uvesti i pokrenuti kakav jest.


$config[zx-auto] not found$config[zx-overlay] not found