Apache RocketMQ s Spring Boot-om

1. Uvod

U ovom uputstvu stvorit ćemo proizvođača i potrošača poruka koristeći Spring Boot i Apache RocketMQ, platformu za distribuciju poruka i streaming podataka otvorenog koda.

2. Ovisnosti

Za Maven projekte moramo dodati ovisnost RocketMQ Spring Boot Starter:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Izrada poruka

Za naš primjer stvorit ćemo osnovni proizvođač poruka koji će slati događaje kad god korisnik doda ili ukloni stavku iz košarice.

Prvo, postavimo mjesto poslužitelja i naziv grupe u našem primjena.svojstva:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = košarica-proizvođač-grupa

Imajte na umu da ako bismo imali više od jednog poslužitelja imena, mogli bismo ih navesti kao host: port; host: port.

Sada, da bude jednostavnije, stvorit ćemo CommandLineRunner aplikacija i generirajte nekoliko događaja tijekom pokretanja aplikacije:

@SpringBootApplication javna klasa CartEventProducer implementira CommandLineRunner {@Autowired private RocketMQTemplate rocketMQTemplate; javna statička void glavna (String [] args) {SpringApplication.run (CartEventProducer.class, args); } javno void vođenje (String ... args) baca iznimku {rocketMQTemplate.convertAndSend ("cart-item-add-topic", new CartItemEvent ("bike", 1)); rocketMQTemplate.convertAndSend ("cart-item-add-topic", novi CartItemEvent ("računalo", 2)); rocketMQTemplate.convertAndSend ("stavka košarice-uklonjena-tema", novi CartItemEvent ("bicikl", 1)); }}

The CartItemEvent sastoji se od samo dva svojstva - ID predmeta i količine:

klasa CartItemEvent {private String itemId; privatna int količina; // konstruktor, getteri i postavljači}

U gornjem primjeru koristimo convertAndSend () metoda, generička metoda definirana AbstractMessageSendingTemplate sažetak klase, za slanje događaja s košarice. Potrebna su dva parametra: odredište, koje je u našem slučaju naziv teme, i korisni teret poruke.

4. Potrošač poruka

Konzumiranje RocketMQ poruka jednostavno je kao i stvaranje komponente Spring s naznakom @RocketMQMessageListener i provedba RocketMQListener sučelje:

@SpringBootApplication javna klasa CartEventConsumer {public static void main (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") javna klasa CardItemAddConsumer implementira RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "Dodavanje stavke: {}", addItemEvent); // dodatna logika}} @Service @RocketMQMessageListener (topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") javna klasa CardItemRemoveConsumer implementira RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("Uklanjanje stavke: {}", removeItemEvent); // dodatna logika}}}

Moramo stvoriti zasebnu komponentu za svaku temu poruke koju slušamo. U svakom od ovih slušatelja definiramo naziv teme i naziv grupe potrošača putem @RocketMQMessageListener bilješka.

5. Sinkroni i asinkroni prijenos

U prethodnim primjerima koristili smo convertAndSend način slanja naših poruka. Ipak imamo neke druge mogućnosti.

Mogli bismo, na primjer, nazvati syncSend koja se razlikuje od convertAndSend jer se vraća SendResult objekt.

Njime se, na primjer, može provjeriti je li naša poruka uspješno poslana ili dobiti svoj ID:

javno void vođenje (String ... args) baca izuzetak {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", new CartItemEvent ("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", new CartItemEvent ("računalo", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("stavka košarice-uklonjena-tema", novi CartItemEvent ("bicikl", 1)); }

Kao convertAndSend, ova se metoda vraća tek kad se postupak slanja dovrši.

Sinkroni prijenos trebali bismo koristiti u slučajevima koji zahtijevaju visoku pouzdanost, kao što su važne poruke obavijesti ili SMS obavijesti.

S druge strane, možda bismo umjesto toga željeli poslati poruku asinkrono i biti obaviješteni kad slanje završi.

To možemo učiniti s asyncSend, koji traje a SendCallback kao parametar i odmah vraća:

rocketMQTemplate.asyncSend ("cart-item-add-topic", new CartItemEvent ("bike", 1), new SendCallback () {@Override public void onSuccess (SendResult sendResult) {log.error ("Uspješno poslan predmet košarice") ;} @Override public void onException (bacanje u bacanje) {log.error ("Iznimka tijekom slanja stavke košarice", bacanje);}});

Koristimo asinkroni prijenos u slučajevima koji zahtijevaju veliku propusnost.

I na kraju, za scenarije u kojima imamo vrlo visoke zahtjeve za propusnošću možemo koristiti sendOneWay umjesto asyncSend. sendOneWay se razlikuje od asyncSend jer ne garantira da će poruka biti poslana.

Jednosmjerni prijenos može se koristiti i za uobičajene slučajeve pouzdanosti, poput skupljanja trupaca.

6. Slanje poruka u transakciji

RocketMQ nam pruža mogućnost slanja poruka unutar transakcije. To možemo učiniti pomoću sendInTransaction () metoda:

MessageBuilder.withPayload (novi CartItemEvent ("bicikl", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("test-transakcija", "ime-teme", msg, null);

Također, moramo implementirati a RocketMQLocalTransactionListener sučelje:

@RocketMQTransactionListener (txProducerGroup = "test-transakcija") klasa TransactionListenerImpl implementira RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arG), COMMN UNRUNKRAKMN, COMMN, UNC, UNB, UNC } @Override public RocketMQLocalTransactionState checkLocalTransaction (Message message) {// ... provjeri status transakcije i vrati ROLLBACK, COMMIT ili UNKNOWN return RocketMQLocalTransactionState.COMMIT; }}

U sendMessageInTransaction (), prvi parametar je naziv transakcije. To mora biti isto kao i @RocketMQTransactionListenerPolje člana txProducerGroup.

7. Konfiguracija proizvođača poruka

Također možemo konfigurirati aspekte samog proizvođača poruke:

  • rocketmq.producer.send-message-timeout: Vremensko ograničenje slanja poruke u milisekundama - zadana vrijednost je 3000
  • rocketmq.producer.compress-message-body-prag: Prag iznad kojeg će RocketMQ komprimirati poruke - zadana vrijednost je 1024.
  • rocketmq.producer.max-message-size: Maksimalna veličina poruke u bajtovima - zadana vrijednost je 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: Maksimalan broj ponovnih pokušaja za interno izvođenje u asinkronom načinu prije slanja neuspjeha - zadana vrijednost je 2.
  • rocketmq.producer.retry-next-server: Označava treba li pokušati s drugim posrednikom pri internom slanju kvara - zadana vrijednost je lažno.
  • rocketmq.producer.retry-times-when-send-failed: Maksimalan broj ponovnih pokušaja za interno izvođenje u asinkronom načinu prije slanja neuspjeha - zadana vrijednost je 2.

8. Zaključak

U ovom smo članku naučili kako slati i trošiti poruke pomoću Apache RocketMQ i Spring Boot. Kao i uvijek sav izvorni kod dostupan je na GitHubu.