Testiranje Kafke i Spring Boota

1. Pregled

Apache Kafka je moćan, distribuiran sustav obrade tokova otporan na greške. U prethodnom uputstvu naučili smo kako raditi s Springom i Kafkom.

U ovom vodiču, nadogradit ćemo prethodni i naučiti kako pisati pouzdane, samostalne testove integracije koji se ne oslanjaju na vanjski Kafka poslužitelj.

Prvo ćemo započeti, ali ćemo razmotriti kako koristiti i konfigurirati ugrađenu instancu Kafke. Tada ćemo vidjeti kako možemo iskoristiti popularni okvir Testcontainers iz naših testova.

2. Ovisnosti

Naravno, morat ćemo dodati standard proljeće-kafka ovisnost o našoj pom.xml:

 org.springframework.kafka spring-kafka 2.6.3.OBUSTAŽENJE 

Tada će nam trebati još dvije ovisnosti posebno za naše testove. Prvo ćemo dodati opruga-kafka-test artefakt:

 org.springframework.kafka opruga-kafka-test 2.6.3.Opusti test 

I na kraju, dodati ćemo ovisnost Testcontainers Kafka, koja je također dostupna na Maven Central:

 org.testcontainers kafka 1.15.0 test 

Sada kada smo konfigurirali sve potrebne ovisnosti, možemo napisati jednostavnu aplikaciju Spring Boot pomoću Kafke.

3. Jednostavna aplikacija proizvođača i potrošača tvrtke Kafka

Tijekom ovog vodiča fokus naših testova bit će jednostavna aplikacija Spring Boot Kafka između proizvođača i potrošača.

Počnimo s definiranjem točke ulaza u prijavu:

@SpringBootApplication @EnableAutoConfiguration javna klasa KafkaProducerConsumerApplication {javna statička void glavna (String [] args) {SpringApplication.run (KafkaProducerConsumerApplication.class, args); }}

Kao što vidimo, ovo je standardna aplikacija Spring Boot. Gdje je moguće, želimo koristiti zadane vrijednosti konfiguracije. Imajući ovo na umu, koristimo se @EnableAutoConfiguration napomena za automatsko konfiguriranje naše aplikacije.

3.1. Postavljanje proizvođača

Dalje, razmotrimo grah proizvođača koji ćemo koristiti za slanje poruka na zadanu Kafkinu temu:

@Component javna klasa KafkaProducer {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; javno prazno slanje (tema niza, nosivost niza) {LOGGER.info ("slanje korisnog tereta =" {} "na topic =" {} "", korisni teret, tema); kafkaTemplate.send (tema, korisni teret); }}

Naše KafkaProducer Grah koji je gore definiran samo je omot oko KafkaTemplate razred. Ova klasa pruža operacije na visokoj razini, poput slanja podataka na navedenu temu, što je upravo ono što mi radimo u našoj poslati metoda.

3.2. Potrošačko postavljanje

Isto tako, sada ćemo definirati jednostavan potrošački grah koji će slušati Kafkinu temu i primati poruke:

@Component javna klasa KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaConsumer.class); privatni zasun CountDownLatch = novi CountDownLatch (1); privatni niz korisnih podataka = null; @KafkaListener (topics = "$ {test.topic}") javna praznina primanje (ConsumerRecord consumerRecord) {LOGGER.info ("primljeno korisno opterećenje =" {} "", consumerRecord.toString ()); setPayload (consumerRecord.toString ()); zasun.countDown (); } javni CountDownLatch getLatch () {povratni zasun; } javni String getPayload () {return payload; }}

Naš jednostavni potrošač koristi @KafkaListener bilješka na primiti metoda za preslušavanje poruka na zadanu temu. Kasnije ćemo vidjeti kako konfiguriramo test.tema iz naših testova.

Nadalje, metoda primanja pohranjuje sadržaj poruke u naš grah i smanjuje broj zasun varijabilna. Ova je varijabla jednostavno polje brojača bez niti koje ćemo koristiti kasnije iz testova kako bismo bili sigurni da smo uspješno primili poruku.

Sad kad smo implementirali našu jednostavnu Kafka aplikaciju koja koristi Spring Boot, pogledajmo kako možemo napisati integracijske testove.

4. Riječ o testiranju

Općenito, prilikom pisanja testova čiste integracije, ne bismo trebali ovisiti o vanjskim uslugama koje možda nećemo moći kontrolirati ili bi mogle iznenada prestati raditi. To bi moglo imati negativne učinke na naše rezultate ispitivanja.

Slično tome, ako ovisimo o vanjskoj usluzi, u ovom slučaju, pokrenutom Kafkinom posredniku, vjerojatno je nećemo moći postaviti, kontrolirati i srušiti na način na koji želimo od naših testova.

4.1. Svojstva aplikacije

Koristit ćemo vrlo lagani skup svojstava konfiguracije aplikacije iz naših testova. Ova svojstva definirat ćemo u našem src / test / resources / application.yml datoteka:

opruga: kafka: potrošač: auto-offset-reset: najraniji id grupe: baeldung test: tema: embedded-test-topic

Ovo je minimalni skup svojstava koji su nam potrebni za rad s ugrađenom instancom Kafke ili lokalnog brokera.

Većina njih su samorazumljive, ali ono što bismo trebali istaknuti od posebne važnosti je potrošačko vlasništvo auto-offset-reset: najranije. Ovo svojstvo osigurava da naša grupa potrošača prima poruke koje šaljemo jer bi spremnik mogao započeti nakon završetka slanja.

Uz to, svojstvo teme konfiguriramo s vrijednošću tema ugrađenog testa, što je tema koju ćemo koristiti u našim testovima.

5. Testiranje pomoću ugrađenog Kafke

U ovom ćemo odjeljku pogledati kako upotrijebiti instancu Kafka u memoriji za pokretanje naših testova. Ovo je također poznato kao ugrađeni Kafka.

Ovisnost opruga-kafka-test koji smo prethodno dodali sadrži neke korisne uslužne programe koji pomažu u testiranju naše aplikacije. Ono što je najvažnije, sadrži EmbeddedKafkaBroker razred.

Imajući to na umu, krenimo i napišite svoj prvi test integracije:

@SpringBootTest @DirtiesContext @EmbeddedKafka (particije = 1, brokerProperties = {"listeners = PLAINTEXT: // localhost: 9092", "port = 9092"}) klasa EmbeddedKafkaIntegrationTest {@Autowired private KafkaConsumer consumer; @Autowired privatni proizvođač KafkaProducer; @Value ("$ {test.topic}") privatna tema niza; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived () baca iznimku {produce.send (tema, "Slanje s vlastitim jednostavnim KafkaProducer"); consumer.getLatch (). await (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), jednakTo (0L)); assertThat (consumer.getPayload (), containsString ("embedded-test-topic")); }}

Krenimo kroz ključne dijelove našeg testa. Prvo započinjemo ukrašavanjem naše testne klase s dvije prilično standardne proljetne bilješke:

  • The @SpringBootTest napomena će osigurati da naš test pokrene kontekst proljetne aplikacije
  • Također koristimo @DirtiesContext napomena koja će osigurati da se ovaj kontekst očisti i resetira između različitih testova

Tu dolazi ključni dio, koristimo @EmbeddedKafka napomena za ubrizgavanje instance EmbeddedKafkaBroker u naše testove. Štoviše, postoji nekoliko dostupnih svojstava koja možemo koristiti za konfiguriranje ugrađenog Kafka čvora:

  • pregrade - ovo je broj particija korištenih po temi. Da bi stvari bile jednostavne i jednostavne, iz naših testova želimo koristiti samo jednu
  • brokerProperties - dodatna svojstva za brokera Kafka. Opet pojednostavljujemo stvari i određujemo preslušavač običnog teksta i broj porta

Dalje, automatski povezujemo naše potrošač i proizvođač klase i konfigurirajte temu tako da koristi vrijednost iz našeg primjena.svojstva.

Za posljednji dio ubodne pile, jednostavno pošaljemo poruku našoj testnoj temi i provjerimo je li poruka primljena i sadrži naziv naše testne teme.

Kada pokrenemo test, među detaljnim proljetnim rezultatima vidjet ćemo:

... 12: 45: 35.099 [glavna] INFO cbkafka.embedded.KafkaProducer - slanje korisnog tereta = "Slanje vlastitim jednostavnim KafkaProducer" na topic = "embedded-test-topic" ... 12: 45: 35.103 [org .springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] INFO cbkafka.embedded.KafkaConsumer - primljeno opterećenje = 'ConsumerRecord (topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, veličina serijskog ključa = -1, veličina serijske vrijednosti = 41, zaglavlja = RecordHeaders (zaglavlja = [], isReadOnly = false),  key = null, value = Slanje s našim jednostavnim KafkaProducer) ' 

To potvrđuje da naš test ispravno radi. Super! Sada imamo način da napišemo samostalne, neovisne testove integracije koristeći Kafkin posrednik u memoriji.

6. Testiranje Kafke testnim spremnicima

Ponekad bismo mogli vidjeti male razlike između stvarne vanjske usluge u odnosu na ugrađenu instancu usluge u memoriji koja je posebno pružena u svrhu testiranja. Iako je malo vjerojatno, moglo bi biti i da je port korišten u našem testu zauzet, što uzrokuje kvar.

Imajući to na umu, u ovom ćemo odjeljku vidjeti varijaciju našeg prethodnog pristupa testiranju pomoću okvira Testcontainers. Vidjet ćemo kako iz našeg integracijskog testa instancirati i upravljati vanjskim Apache Kafka posrednikom hostiranim unutar Docker spremnika.

Definirajmo još jedan test integracije koji će biti prilično sličan onome koji smo vidjeli u prethodnom odjeljku:

@RunWith (SpringRunner.class) @import (com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest (= klase KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest {@ClassRule javni statički KafkaContainer kafka = new KafkaContainer (DockerImageName .parse ("confluentinc / cp-kafka: 5.4.3")); @Automobilski privatni potrošač KafkaConsumer; @Autowired privatni proizvođač KafkaProducer; @Value ("$ {test.topic}") privatna tema niza; @Test javna praznina givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived () baca izuzetak {produce.send (tema, "Slanje vlastitim kontrolerom"); consumer.getLatch (). await (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), jednakTo (0L)); assertThat (consumer.getPayload (), containsString ("embedded-test-topic")); }}

Pogledajmo razlike ovog puta. Proglašavamo kafka polje, što je standardni JUnit @ClassRule. Ovo je polje instanca KafkaContainer klasa koja će pripremiti i upravljati životnim ciklusom našeg kontejnera koji pokreće Kafku.

Da bi izbjegao sukobe priključaka, Testcontainers dinamički dodjeljuje broj priključka kada se pokreće naš spremnik dockera. Iz tog razloga pružamo prilagođenu tvorničku konfiguraciju potrošača i proizvođača koristeći klasu KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs () {Rekviziti karte = novi HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "najranije"); props.put (ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // više rekvizita za povrat standardne konfiguracije; } @Bean public ProducerFactory produceFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); // više standardne konfiguracije vraća novu DefaultKafkaProducerFactory (configProps); }

Zatim upućujemo na ovu konfiguraciju putem @Uvoz napomena na početku našeg testa.

Razlog tome je što nam je potreban način za ubrizgavanje adrese poslužitelja u našu aplikaciju koja se, kao što je prethodno spomenuto, generira dinamički. To postižemo pozivom na getBootstrapServers () metoda, koja će vratiti mjesto poslužitelja bootstrap:

bootstrap.servers = [PLAINTEXT: // localhost: 32789]

Sada kada pokrenemo test, trebali bismo vidjeti da Testcontainers radi nekoliko stvari:

  • Provjerava postavke lokalnog Dockera.
  • Povlači konfluentin / cp-kafka: 5.4.3 slika dockera ako je potrebno
  • Pokreće novi spremnik i čeka da bude spreman
  • Napokon se isključuje i briše spremnik nakon završetka našeg ispitivanja

To se opet potvrđuje pregledom rezultata ispitivanja:

13: 33: 10.396 [glavna] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - Izrada spremnika za sliku: confluentinc / cp-kafka: 5.4.3 13: 33: 10.454 [glavna] INFO 🐳 [confluentinc / cp -kafka: 5.4.3] - Počevši spremnik ID: 13: 33 b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3: 10,785 [glavni] OPIS 🐳 [confluentinc / cp-kafka: 5.4.3] - spremnik confluentinc / cp-kafka: 5.4.3 je početni: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! Radni test integracije pomoću spremnika Kafka dockera.

7. Zaključak

U ovom smo članku naučili nekoliko pristupa za testiranje Kafka aplikacija s Spring Boot. U prvom pristupu vidjeli smo kako konfigurirati i koristiti lokalnog Kafka brokera u memoriji.

Tada smo vidjeli kako pomoću Testcontainers postaviti vanjski Kafka broker koji radi u spremniku dockera iz naših testova.

Kao i uvijek, puni izvorni kôd članka dostupan je na GitHub-u.


$config[zx-auto] not found$config[zx-overlay] not found