Uvod u Apache Kafku s Proljećem

Vrh postojanosti

Upravo sam najavio novo Uči proljeće tečaj, usredotočen na osnove Spring 5 i Spring Boot 2:

>> PROVJERITE TEČAJ

1. Pregled

Apache Kafka distribuirani je sustav obrade tokova otporan na greške.

U ovom ćemo članku pokriti proljetnu podršku za Kafku i razinu apstrakcija koje pruža preko matičnih API-ja Kafka Java klijenta.

Spring Kafka donosi jednostavan i tipičan model programiranja predloška Spring s a KafkaTemplate i POJO-ovi na temelju poruka putem @KafkaListener bilješka.

2. Instalacija i postavljanje

Da biste preuzeli i instalirali Kafku, pogledajte službeni vodič ovdje.

Također moramo dodati proljeće-kafka ovisnost o našoj pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.OSLOBOĐENJE 

Najnoviju verziju ovog artefakta možete pronaći ovdje.

Naš primjer aplikacije bit će aplikacija Spring Boot.

Ovaj članak pretpostavlja da je poslužitelj pokrenut pomoću zadane konfiguracije i da se nisu promijenili portovi poslužitelja.

3. Konfiguriranje tema

Prije smo koristili alate naredbenog retka za stvaranje tema na Kafki kao što su:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ - faktor replikacije 1 - particije 1 \ --topic mytopic

Ali s uvođenjem AdminClient u Kafki sada teme možemo stvarati programski.

Moramo dodati KafkaAdmin Proljetni grah, koji će automatski dodati teme za sve vrste graha Nova tema:

@Configuration javna klasa KafkaTopicConfig {@Value (value = "$ {kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); vratiti novi KafkaAdmin (konfiguracije); } @Bean public NewTopic topic1 () {return new NewTopic ("baeldung", 1, (short) 1); }}

4. Izrada poruka

Da bismo stvorili poruke, prvo moramo konfigurirati a ProizvođačFactory koja postavlja strategiju stvaranja Kafke Proizvođač instance.

Tada nam treba a KafkaTemplate koja obavija a Proizvođač instanci i pruža praktične metode za slanje poruka na Kafka teme.

Proizvođač instance su zaštićene od niti i stoga će korištenje jedne instance kroz kontekst aplikacije dati veće performanse. Slijedom toga, KakfaTemplate instance su također zaštićene nitima i preporučuje se upotreba jedne instance.

4.1. Konfiguracija proizvođača

@Configuration javna klasa KafkaProducerConfig {@Bean public ProducerFactory produceFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); vrati novi DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate kafkaTemplate () {return new KafkaTemplate (produceFactory ()); }}

4.2. Objavljivanje poruka

Poruke možemo slati pomoću KafkaTemplate razred:

@Autowired private KafkaTemplate kafkaTemplate; javna praznina sendMessage (String msg) {kafkaTemplate.send (topicName, msg); }

The poslati API vraća a Slušljivo u budućnosti objekt. Ako želimo blokirati nit slanja i dobiti rezultat o poslanoj poruci, možemo nazvati dobiti API Slušljivo u budućnosti objekt. Nit će pričekati rezultat, ali usporit će proizvođača.

Kafka je brza platforma za obradu streama. Stoga je bolja ideja s rezultatima postupati asinkrono, tako da naredne poruke ne čekaju rezultat prethodne poruke. To možemo učiniti putem povratnog poziva:

javna void sendMessage (niz poruka) {ListenableFuture budućnost = kafkaTemplate.send (imeName, poruka); future.addCallback (novi ListenableFutureCallback() {@Override public void onSuccess (rezultat rezultata SendResult) {System.out.println ("Sent message = [" + message + "] with offset = [" + result.getRecordMetadata (). Offset () + "]")) ; } @Override public void onFailure (Dopušteno ex) {System.out.println ("Nije moguće poslati poruku = [" + poruka + "] zbog:" + ex.getMessage ()); }}); }

5. Konzumiranje poruka

5.1. Konfiguracija potrošača

Za konzumiranje poruka moramo konfigurirati a ConsumerFactory i a KafkaListenerContainerFactory. Jednom kad je ovaj grah dostupan u tvornici proljetnog graha, potrošači na bazi POJO mogu se konfigurirati pomoću @KafkaListener bilješka.

@EnableKafka napomena je potrebna na klasi konfiguracije kako bi se omogućilo otkrivanje @KafkaListener napomena o grahu s proljetnim upravljanjem:

@EnableKafka @Configuration javna klasa KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Rekviziti mape = novi HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); vrati novi DefaultKafkaConsumerFactory (rekviziti); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); tvornica za povratak; }}

5.2. Konzumiranje poruka

@KafkaListener (topics = "topicName", groupId = "foo") javna praznina listenGroupFoo (String poruka) {System.out.println ("Primljena poruka u grupi foo:" + poruka); }

Za temu se može implementirati više slušatelja, svaki s različitim ID-om grupe. Nadalje, jedan potrošač može slušati poruke iz različitih tema:

@KafkaListener (topics = "topic1, topic2", groupId = "foo")

Spring također podržava preuzimanje jednog ili više zaglavlja poruka pomoću @Zaglavlje napomena u slušatelju:

@KafkaListener (topics = "topicName") javna praznina listenWithHeaders (@Payload String poruka, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int particija) {System.out.println ("Primljena poruka:" + poruka "+" s particije: "+ particija);}

5.3. Konzumiranje poruka s određene particije

Kao što ste možda primijetili, temu smo kreirali mi baeldung sa samo jednom particijom. Međutim, za temu s više particija, a @KafkaListener mogu se izričito pretplatiti na određenu particiju teme s početnim pomakom:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", InitiOffset = "0"), @PartitionOffset (partition = "3", InitiOffset = "0")}) . ;}

Budući da je InitialOffset je poslana na 0 u ovom slušatelju, sve prethodno konzumirane poruke s particija 0 i tri ponovno će se potrošiti svaki put kad se ovaj slušatelj pokrene. Ako podešavanje pomaka nije potrebno, možemo koristiti pregrade vlasništvo @TopicPartition napomena za postavljanje samo particija bez pomaka:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitions = {"0", "1"}))

5.4. Dodavanje filtra poruka za slušatelje

Slušatelji se mogu konfigurirati za konzumiranje određenih vrsta poruka dodavanjem prilagođenog filtra. To se može učiniti postavljanjem a RecordFilterStrategy prema KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (zapis -> zapis.value (). sadrži ("Svijet")); tvornica za povratak; }

Slušatelj se tada može konfigurirati da koristi ovu tvornicu spremnika:

@KafkaListener (topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") javna void listenWithFilter (poruka niza) {System.out.println ("Primljena poruka u filtriranom slušatelju:" + poruka); }

U ovom slušatelju, svi poruke koje odgovaraju filtru bit će odbačene.

6. Prilagođeni pretvarači poruka

Do sada smo samo slanje i primanje stringova pokrivali kao poruke. Međutim, također možemo slati i primati prilagođene Java objekte. To zahtijeva konfiguriranje odgovarajućeg serializatora u ProizvođačFactory i deserializator u ConsumerFactory.

Pogledajmo jednostavnu klasu graha, koje ćemo poslati kao poruke:

javni razred Pozdrav {private String msg; privatni naziv niza; // standardni getteri, postavljači i konstruktor}

6.1. Izrada prilagođenih poruka

U ovom ćemo primjeru koristiti JsonSerializer. Pogledajmo kod za ProizvođačFactory i KafkaTemplate:

@Bean public ProducerFactory pozdravProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); vrati novi DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate pozdravKafkaTemplate () {return new KafkaTemplate (pozdravProducerFactory ()); }

Ovo novo KafkaTemplate može se koristiti za slanje Pozdrav poruka:

kafkaTemplate.send (imeName, novi pozdrav ("Zdravo", "Svijet"));

6.2. Konzumiranje prilagođenih poruka

Slično tome, izmijenimo ConsumerFactory i KafkaListenerContainerFactory da biste ispravno deserializirali pozdravnu poruku:

@Bean public ConsumerFactory HelloConsumerFactory () {// ... return new DefaultKafkaConsumerFactory (rekviziti, novi StringDeserializer (), novi JsonDeserializer (Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory pozdravKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (pozdravConsumerFactory ()); tvornica za povratak; }

Spring-kafka JSON serializator i deserializator koristi Jackson knjižnicu koja je također neobavezna ovisnost mavena za projekt spring-kafka. Pa dodajmo to našem pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Umjesto korištenja najnovije verzije Jacksona, preporučljivo je koristiti verziju koja je dodana u pom.xml proljetne-kafke.

Napokon, trebamo napisati slušatelja koji će konzumirati Pozdrav poruke:

@KafkaListener (topics = "topicName", containerFactory = "pozdravKafkaListenerContainerFactory") javna praznina pozdravListener (pozdravni pozdrav) {// obrada pozdravne poruke}

7. Zaključak

U ovom smo članku pokrili osnove proljetne podrške za Apache Kafku. Kratko smo pogledali razrede koji se koriste za slanje i primanje poruka.

Kompletni izvorni kod za ovaj članak možete pronaći na GitHubu. Prije izvođenja koda, provjerite radi li Kafka poslužitelj i jesu li teme kreirane ručno.

Dno postojanosti

Upravo sam najavio novo Uči proljeće tečaj, usredotočen na osnove Spring 5 i Spring Boot 2:

>> PROVJERITE TEČAJ

$config[zx-auto] not found$config[zx-overlay] not found