Otprema poruka RabbitMQ s proljetnim AMQP-om

1. Uvod

U ovom uputstvu istražit ćemo koncept fanout i razmjene tema s proljetnim AMQP-om i RabbitMQ-om.

Na visokoj razini, razmjene navijača htjeti emitirati istu poruku u sve uvezane redove, dok razmjene tema koristite ključ usmjeravanja za prosljeđivanje poruka određenom vezanom redu ili redovima.

Prethodno čitanje Poruka s proljećem AMQP preporučuje se za ovaj vodič.

2. Postavljanje razmjene navijača

Postavimo jednu razmjenu obožavatelja s dva reda vezana uz nju. Kada pošaljemo poruku ovoj centrali, oba reda primit će je. Naša razmjena obožavatelja ignorira bilo koji ključ usmjeravanja koji je priložen poruci.

Proljetni AMQP omogućuje nam agregiranje svih deklaracija redova, razmjena i veza u a Deklarirajuće objekt:

@Bean public Declarables fanoutBindings () {Queue fanoutQueue1 = new Queue ("fanout.queue1", false); Red čekanja fanoutQueue2 = novi red čekanja ("fanout.queue2", false); FanoutExchange fanoutExchange = novi FanoutExchange ("fanout.exchange"); vratiti nove deklarabilne (fanoutQueue1, fanoutQueue2, fanoutExchange, bind (fanoutQueue1) .to (fanoutExchange), BindingBuilder.bind (fanoutQueue2) .to (fanoutExchange)); }

3. Postavljanje razmjene tema

Sad ćemo postaviti i razmjenu tema s dva reda čekanja, svaki s različitim uzorkom vezanja:

@Bean public Declarables topicBindings () {Red čekanja topicQueue1 = novi red čekanja (topicQueue1Name, false); Red čekanja topicQueue2 = novi red (topicQueue2Name, false); TopicExchange topicExchange = nova TopicExchange (topicExchangeName); vratiti nove deklarirane (topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind (topicQueue1) .to (topicExchange) .with ("*. važno. *"), BindingBuilder .bind (topicQueue2) .to (topicExchange) .with ("#. pogreška ")); }

Razmjena tema omogućuje nam da na nju vežemo redove s različitim ključnim uzorcima. To je vrlo fleksibilno i omogućuje nam povezivanje više redova s ​​istim uzorkom ili čak više uzoraka u isti red.

Kada se ključ usmjeravanja poruke podudara s uzorkom, stavit će se u red čekanja. Ako red ima više veza koje se podudaraju s ključem usmjeravanja poruke, u red se postavlja samo jedna kopija poruke.

Naši obrasci vezanja mogu upotrijebiti zvjezdicu ("*") za podudaranje riječi na određenom položaju ili znak funte ("#") za podudaranje s nula ili više riječi.

Dakle, naša topicQueue1 primit će poruke koje imaju ključeve usmjeravanja s uzorkom od tri riječi, a srednja riječ je "važna" - na primjer: "User.important.error" ili “Blog.važno.notification”.

I, naše topicQueue2 primit će poruke koje imaju ključeve usmjeravanja koji završavaju riječju pogreška; primjeri podudaranja su "Pogreška", "User.important.error" ili “Blog.post.save.error”.

4. Postavljanje proizvođača

Koristit ćemo convertAndSend metoda RabbitTemplate za slanje naših primjeraka poruka:

 String message = "emitira se nosivost"; vrati argumente -> {rabbitTemplate.convertAndSend (FANOUT_EXCHANGE_NAME, "", "fanout" + poruka); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "upozorenje o važnoj temi" + poruka); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "greška važna tema" + poruka); };

The RabbitTemplate pruža mnoge preopterećene convertAndSend () metode za različite vrste razmjene.

Kada pošaljemo poruku na razmjenu navijača, ključ usmjeravanja se zanemaruje, a poruka se prosljeđuje svim vezanim redovima.

Kada pošaljemo poruku na razmjenu tema, moramo proslijediti ključ usmjeravanja. Na temelju ovog ključa usmjeravanja poruka će se dostaviti u određene redove.

5. Konfiguriranje potrošača

Na kraju, postavimo četiri potrošača - po jednog za svaki red čekanja - da preuzimaju proizvedene poruke:

 @RabbitListener (queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1 (String message) {System.out.println ("Received fanout 1 message:" + message); } @RabbitListener (queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2 (String message) {System.out.println ("Received fanout 2 message:" + message); } @RabbitListener (queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1 (String message) {System.out.println ("Primljena tema 1 (" + BINDING_PATTERN_IMPORTANT + ") poruka:" + poruka); } @RabbitListener (queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2 (String message) {System.out.println ("Primljena tema 2 (" + BINDING_PATTERN_ERROR + ") poruka:" + poruka); }

Potrošače konfiguriramo pomoću @RabbitListener bilješka. Jedini argument koji se ovdje prenosi je ime redova. Potrošači ovdje nisu svjesni razmjene ili ključeva usmjeravanja.

6. Pokretanje primjera

Naš uzorak projekta je aplikacija Spring Boot, pa će inicijalizirati aplikaciju zajedno s vezom na RabbitMQ i postaviti sve redove, razmjene i povezivanja.

Prema zadanim postavkama, naša aplikacija očekuje primjerak RabbitMQ koji se izvodi na localhostu na portu 5672. To i druge zadane postavke možemo izmijeniti u primjena.yaml.

Naš projekt izlaže HTTP krajnju točku na URI-u - / emitiranje - koji prihvaća POST-ove s porukom u tijelu zahtjeva.

Kada ovom URI-u pošaljemo zahtjev s tijelom “Test”, trebali bismo vidjeti nešto slično ovome u izlazu:

Primljena poruka ventila 1: emitira se korisni teret ventila Primljena poruka 1 (* .važno. *): Emitira se tema važna upozorenja. Primljena je poruka 2 (# .pogreška): emitira se poruka važne greške teme. Primljena je poruka navijača 2 emitira se Primljena poruka 1 (* .važna. *): emitira se korisna nosivost važne teme

Redoslijed kojim ćemo vidjeti ove poruke, naravno, nije zajamčen.

7. Zaključak

U ovom smo brzom vodiču pokrili razmjenu obožavatelja i tema s Spring AMQP i RabbitMQ.

Kompletni izvorni kôd i svi isječci koda za ovu lekciju dostupni su na spremištu GitHub.