Točno jednom obrada u Kafki s Javom

1. Pregled

U ovom ćemo uputstvu pogledati kako Kafka osigurava isporuku tačno jednom između aplikacija proizvođača i potrošača putem novo predstavljenog Transactional API-ja.

Uz to, koristit ćemo ovaj API za implementaciju transakcijskih proizvođača i potrošača kako bismo postigli isporuku od kraja do kraja točno u primjeru WordCount.

2. Dostava poruka u Kafki

Zbog različitih kvarova, sustavi za razmjenu poruka ne mogu jamčiti isporuku poruka između proizvođača i potrošača. Ovisno o tome kako klijentske aplikacije komuniciraju s takvim sustavima, moguće su sljedeće semantike poruka:

  • Ako sustav za razmjenu poruka nikada neće duplicirati poruku, ali možda propusti povremenu poruku, mi to zovemo odjednom
  • Ili, ako nikada neće propustiti poruku, ali možda duplicira povremenu poruku, mi je zovemo barem jednom
  • Ali, ako uvijek isporučuje sve poruke bez dupliciranja, to je točno-jednom

U početku je Kafka podržavao isporuku poruka samo jednom i barem jednom.

Međutim, uvođenje Transakcija između Kafkinih brokera i klijentskih aplikacija osigurava isporuku u Kafki točno jednom. Da bismo ga bolje razumjeli, pregledajmo brzo API klijenta za transakcije.

3. Ovisnosti Mavena

Za rad s API-jem za transakcije trebat će nam Kafkin Java klijent u našem pom:

 org.apache.kafka kafka-klijenti 2.0.0 

4. Transakcijski trošiti-transformirati-proizvoditi Petlja

Za naš ćemo primjer potrošiti poruke iz ulazne teme, rečenice.

Tada ćemo za svaku rečenicu prebrojati svaku riječ i poslati pojedinačni broj riječi na izlaznu temu, broji.

U primjeru ćemo pretpostaviti da podaci o transakcijama već postoje u rečenice tema.

4.1. Proizvođač svjestan transakcija

Dakle, dodajmo prvo tipičnog proizvođača Kafke.

Svojstva produceProps = nova svojstva (); produceProps.put ("bootstrap.servers", "localhost: 9092");

Ipak, doduše, moramo navesti i transakcijski.id i omogućiti idempotencija:

produceProps.put ("enable.idempotence", "true"); produceProps.put ("transactional.id", "prod-1"); KafkaProducer proizvođač = novi KafkaProducer (proizvođačProps);

Budući da smo omogućili idempotenciju, Kafka će koristiti ovaj ID transakcije kao dio svog algoritma za deduplicirati bilo koju poruku ovog proizvođačašalje, osiguravajući idempotentnost.

Jednostavno rečeno, ako proizvođač Kafki slučajno više puta pošalje istu poruku, ove postavke omogućuju joj da je primijeti.

Sve što trebamo učiniti je pobrinite se da je ID transakcije različit za svakog proizvođača, iako dosljedan tijekom ponovnih pokretanja.

4.2. Omogućavanje proizvođača za transakcije

Jednom kad smo spremni, tada trebamo i nazvati initTransaction za pripremu proizvođača da koristi transakcije:

producent.initTransactions ();

Ovo registrira proizvođača kod brokera kao onoga koji može koristiti transakcije, identificirajući ga po svom transakcijski.id i redni broj ili epoha. Zauzvrat, posrednik će ih koristiti za prepisivanje bilo kakvih radnji u dnevnik transakcija.

I posljedično, posrednik će ukloniti sve radnje iz tog dnevnika koje pripadaju proizvođaču s istim ID-om transakcije i starijimepoha, pretpostavljajući da su iz nepostojećih transakcija.

4.3. Potrošač svjestan transakcija

Kada konzumiramo, možemo čitati sve poruke na particiji teme redom. Iako, možemo označiti sa izolacija.razina da bismo trebali pričekati da čitamo transakcijske poruke dok se ne izvrši pridružena transakcija:

Svojstva consumerProps = nova svojstva (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("enable.auto.commit", "false"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer potrošač = novi KafkaConsumer (consumerProps); consumer.subscribe (singleton ("rečenice"));

Koristeći vrijednost read_committed osigurava da ne čitamo nijednu transakcijsku poruku prije završetka transakcije.

Zadana vrijednost izolacija.razina je read_uncommitted.

4.4. Potrošnja i transformacija transakcijom

Sad kad smo proizvođač i potrošač konfigurirani za transakcijsko pisanje i čitanje, možemo konzumirati zapise iz naše ulazne teme i brojati svaku riječ u svakom zapisu:

Zapisi ConsumerRecords = consumer.poll (ofSeconds (60)); Mapiraj wordCountMap = records.records (novi TopicPartition ("input", 0)) .stream () .flatMap (record -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (riječ, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

Imajte na umu da u gore navedenom kodu nema ničega transakcijskog. Ali, otkad smo koristili read_committed, to znači da potrošač neće čitati poruke koje su napisane na ulaznu temu u istoj transakciji dok sve ne budu napisane.

Sada izračunati broj riječi možemo poslati na izlaznu temu.

Pogledajmo kako možemo postići svoje rezultate, također transakcijski.

4.5. Pošalji API

Da bismo svoje brojeve poslali kao nove poruke, ali u istoj transakciji, zovemo startTransaction:

producent.beginTransaction ();

Zatim svaku možemo napisati u našu temu "broji", a ključ je riječ, a count vrijednost:

wordCountMap.forEach ((ključ, vrijednost) -> proizvođač.send (novi ProducerRecord ("broji", ključ, vrijednost.toString ())));

Imajte na umu da, jer proizvođač može podatke podijeliti po ključu, to znači da transakcijske poruke mogu se protezati na više particija, a svaku čitaju zasebni potrošači. Stoga će Kafka broker pohraniti popis svih ažuriranih particija za transakciju.

Također imajte na umu da, unutar transakcije, proizvođač može upotrijebiti više niti za paralelno slanje zapisa.

4.6. Počinjanje offseta

I na kraju, moramo izvršiti svoje nadoknade koje smo upravo potrošili. Kod transakcija pomake vraćamo natrag u ulaznu temu iz koje smo ih pročitali, kao i obično. Također, mi poslati ih proizvođačevoj transakciji.

Sve to možemo u jednom pozivu, ali prvo moramo izračunati pomake za svaku particiju teme:

Map offsetsToCommit = novi HashMap (); za (particija TopicPartition: records.partitions ()) {Popis partitionedRecords = Records.records (particija); long offset = partitionedRecords.get (partitionedRecords.size () - 1) .offset (); offsetsToCommit.put (particija, novi OffsetAndMetadata (offset + 1)); }

Imajte na umu da je ono što obvezujemo na transakciju nadolazeći pomak, što znači da moramo dodati 1.

Tada možemo poslati izračunate odstupanja od transakcije:

producent.sendOffsetsToTransaction (offsetsToCommit, "moja-grupa-id");

4.7. Počinjanje ili poništavanje transakcije

I, konačno, možemo izvršiti transakciju, koja će atomski zapisati pomake u potrošački_kompani temu, kao i na samu transakciju:

proizvođač.commitTransaction ();

Ovo uklanja svaku međuspremničku poruku na odgovarajuće particije. Uz to, Kafka broker čini sve poruke u toj transakciji dostupnima potrošačima.

Naravno, ako nešto pođe po krivu dok obrađujemo, na primjer, ako uhvatimo iznimku, možemo nazvati abortTransaction:

probajte {// ... čitati iz ulazne teme // ... transformirati // ... pisati u izlaznu temu proizvođač.commitTransaction (); } ulov (izuzetak e) {proizvođač.abortTransaction (); }

I ispustite sve me uspremljene poruke i uklonite transakciju od posrednika.

Ako se niti ne obvežemo niti prekinemo prije konfiguriranja brokera max.transaction.timeout.ms, broker Kafka će prekinuti samu transakciju. Zadana vrijednost za ovo svojstvo je 900 000 milisekundi ili 15 minuta.

5. Ostalo trošiti-transformirati-proizvoditi Petlje

Ovo što smo upravo vidjeli je osnovno trošiti-transformirati-proizvoditi petlja koja čita i upisuje u isti Kafka klaster.

Obrnuto, aplikacije koje moraju čitati i pisati u različite Kafka klastere moraju koristiti starije commitSync i commitAsync API. Tipično, aplikacije pohranjuju potrošačke pomake u svoju vanjsku pohranu stanja kako bi održale transakcijsku sposobnost.

6. Zaključak

Za kritične aplikacije podataka, obrada od kraja do kraja točno jednom je često imperativ.

U ovom vodiču, vidjeli smo kako koristimo Kafku kako bismo učinili upravo to, koristeći transakcijei implementirali smo primjer brojanja riječi zasnovan na transakcijama da bismo ilustrirali princip.

Slobodno provjerite sve uzorke koda na GitHubu.


$config[zx-auto] not found$config[zx-overlay] not found