MQTT klijent u Javi

1. Pregled

U ovom uputstvu vidjet ćemo kako možemo dodati MQTT poruke u Java projekt koristeći biblioteke koje pruža projekt Eclipse Paho.

2. MQTT Primer

MQTT (MQ Telemetry Transport) protokol je za razmjenu poruka koja je stvorena da odgovori na potrebu za jednostavnom i laganom metodom prijenosa podataka na / s uređaja s malim napajanjem, poput onih koji se koriste u industrijskim aplikacijama.

S povećanom popularnošću IoT (Internet of Things) uređaja, MQTT bilježi povećanu uporabu, što dovodi do njegove standardizacije od strane OASIS i ISO.

Protokol podržava jedan obrazac za razmjenu poruka, naime obrazac Objavi i pretplati: svaka poruka koju pošalje klijent sadrži povezanu „temu“ koju posrednik koristi za usmjeravanje na pretplaćene klijente. Nazivi tema mogu biti jednostavni nizovi poput "uljna temp"Ili niz poput staze"motor / 1 / o / min“.

Da bi primao poruke, klijent se pretplaćuje na jednu ili više tema koristeći točan naziv ili niz koji sadrži jedan od podržanih zamjenskih znakova ("#" za teme na više razina i "+" za jednorazinu ").

3. Postavljanje projekta

Da bismo uključili Paho knjižnicu u Maven projekt, moramo dodati sljedeću ovisnost:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Najnoviju verziju modula biblioteke Eclipse Paho Java možete preuzeti s Maven Central.

4. Postavljanje klijenta

Kada koristimo Paho knjižnicu, prvo što moramo učiniti da bismo mogli slati i / ili primati poruke od MQTT brokera jest dobiti provedbu IMqttClient sučelje. Ovo sučelje sadrži sve metode potrebne aplikaciji za uspostavljanje veze s poslužiteljem, slanje i primanje poruka.

Paho izlazi iz kutije s dvije izvedbe ovog sučelja, asinkronom (MqttAsyncClient) i sinkroni (MqttClient).U našem ćemo se slučaju usredotočiti na sinkronu verziju koja ima jednostavniju semantiku.

Sama instalacija postupak je u dva koraka: prvo kreiramo instancu MqttClient klase, a zatim ga povezujemo s našim poslužiteljem. Sljedeći pododjeljak detaljno opisuje te korake.

4.1. Stvaranje novog IMqttClient Primjer

Sljedeći isječak koda pokazuje kako stvoriti novi IMqttClient sinkrona instanca:

Niz publisherId = UUID.randomUUID (). ToString (); Izdavač IMqttClient = novi MqttClient ("tcp: //iot.eclipse.org: 1883", publisherId);

U ovom slučaju, koristimo najjednostavniji dostupni konstruktor koji uzima adresu krajnje točke našeg MQTT brokera i identifikator klijenta, koji jedinstveno identificira našeg klijenta.

U našem smo slučaju koristili slučajni UUID, pa će se pri svakom izvršavanju generirati novi identifikator klijenta.

Paho također nudi dodatne konstruktore koje možemo koristiti za prilagodbu mehanizma trajanja koji se koristi za pohranu neprihvaćenih poruka i / ili ScheduledExecutorService koristi se za pokretanje pozadinskih zadataka zahtijevanih implementacijom mehanizma protokola.

Krajnja točka poslužitelja koji koristimo javni je MQTT posrednik kojeg hostira Paho projekt, koji omogućuje svima koji imaju internetsku vezu da testiraju klijente bez potrebe za bilo kakvom provjerom autentičnosti.

4.2. Povezivanje s poslužiteljem

Naša novostvorena MqttClient instanca nije povezana s poslužiteljem. To činimo tako da nazovemo njegov Spojiti() metoda, po želji dodavanje a MqttConnectOptions instancu koja nam omogućuje prilagodbu nekih aspekata protokola.

Te opcije posebno možemo koristiti za prosljeđivanje dodatnih podataka poput sigurnosnih vjerodajnica, načina oporavka sesije, načina ponovnog povezivanja i tako dalje.

The MqttConnectionOptions klasa izlaže te opcije kao jednostavna svojstva koja možemo postaviti pomoću uobičajenih metoda postavljanja. Trebamo samo postaviti svojstva potrebna za naš scenarij - preostala će poprimiti zadane vrijednosti.

Kôd koji se koristi za uspostavljanje veze s poslužiteljem obično izgleda ovako:

MqttConnectOptions options = nove MqttConnectOptions (); options.setAutomaticReconnect (true); options.setCleanSession (true); options.setConnectionTimeout (10); publisher.connect (opcije);

Ovdje definiramo mogućnosti povezivanja tako da:

  • Knjižnica će se automatski pokušati ponovno povezati s poslužiteljem u slučaju mrežne greške
  • Odbacit će neposlate poruke iz prethodnog pokretanja
  • Vrijeme čekanja veze postavljeno je na 10 sekundi

5. Slanje poruka

Slanje poruka pomoću već povezanog MqttClient je vrlo izravno. Koristimo jedan od objaviti() varijante metode za slanje korisnog tereta, koji je uvijek bajtni niz, na zadanu temu, koristeći jednu od sljedećih opcija kvalitete usluge:

  • 0 - semantika „najviše jednom“, poznata i kao „vatra i zaboravi“. Koristite ovu opciju kad je gubitak poruke prihvatljiv, jer ne zahtijeva nikakvu potvrdu ili upornost
  • 1 - semantika "barem jednom". Koristite ovu opciju kada gubitak poruke nije prihvatljiv i vaši pretplatnici mogu rukovati duplikatima
  • 2 - semantika "točno jednom". Koristite ovu opciju kada gubitak poruke nije prihvatljiv i vaši pretplatnici ne mogu rukovati duplikatima

U našem uzorku projekta, Senzor temperature motora klasa igra ulogu lažnog senzora koji stvara novo očitanje temperature svaki put kad ga aktiviramo poziv() metoda.

Ova klasa provodi Pozivno sučelje tako da ga lako možemo koristiti s jednim od ExecutorService implementacije dostupne u java.util.concurrent paket:

javna klasa EngineTemperatureSensor provodi Callable {// ... privatni članovi izostavljaju javni EngineTemperatureSensor (klijent IMqttClient) {this.client = client; } @Override public Void call () baca iznimku {if (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetain (true); client.publish (TOPIC, msg); return null; } private MqttMessage readEngineTemp () {double temp = 80 + rnd.nextDouble () * 20,0; bajt [] nosivost = String.format ("T:% 04.2f", temp) .getBytes (); vratiti novu MqttMessage (korisni teret); }}

The MqttMessage enkapsulira sam nosivost, traženu kvalitetu usluge i također zadržan zastava za poruku. Ova zastavica brokeru pokazuje da bi trebao zadržati ovu poruku dok je pretplatnik ne potroši.

Ovu značajku možemo koristiti za provođenje ponašanja „posljednjeg poznatog dobra“, pa će se, kada se novi pretplatnik poveže s poslužiteljem, odmah primiti zadržana poruka.

6. Primanje poruka

Da biste primali poruke od MQTT brokera, trebamo koristiti jedan od pretplatite se () varijante metode, koji nam omogućuju da odredimo:

  • Jedan ili više filtara tema za poruke koje želimo primati
  • Pridruženi QoS
  • Rukovatelj povratnim pozivom za obradu primljenih poruka

U sljedećem primjeru pokazujemo kako dodati slušatelj poruke postojećem IMqttClient instance za primanje poruka iz zadane teme. Koristimo a CountDownLatch kao mehanizam sinkronizacije između našeg povratnog poziva i glavne niti izvršenja, smanjujući ga svaki put kad stigne nova poruka.

U uzorku koda koristili smo drugačiji IMqttClient instance za primanje poruka. Učinili smo to samo da bismo pojasnili koji klijent što radi, ali to nije Pahovo ograničenje - ako želite, možete koristiti isti klijent za objavljivanje i primanje poruka:

CountDownLatch primljenSignal = novi CountDownLatch (10); pretplatnik.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg)) -> {byte [] payload = msg.getPayload (); // ... rukovanje s korisnim teretom izostavljeno PrimljenoSignal.countDown ();}); полученSignal.await (1, TimeUnit.MINUTES);

The pretplatite se () gore korištena varijanta uzima an IMqttMessageListener primjer kao svoj drugi argument.

U našem slučaju koristimo jednostavnu lambda funkciju koja obrađuje nosivost i smanjuje brojač. Ako u navedenom vremenskom roku (1 minuta) ne stigne dovoljno poruka, čekati() metoda izbacit će iznimku.

Kada koristimo Paho, ne trebamo izričito potvrđivati ​​primitak poruke. Ako se povratni poziv normalno vrati, Paho pretpostavlja da je uspješno potrošen i šalje potvrdu poslužitelju.

Ako povratni poziv baci znak Iznimka, klijent će biti zatvoren. Napominjemo da će ovo rezultirati gubitkom svih poruka poslanih s razinom QoS-a 0.

Poruke poslane QoS razinom 1 ili 2 poslužitelj će ponovno zamijeniti nakon što se klijent ponovno poveže i ponovo pretplati na temu.

7. Zaključak

U ovom smo članku pokazali kako možemo dodati podršku za MQTT protokol u našim Java aplikacijama koristeći biblioteku koju pruža projekt Eclipse Paho.

Ova knjižnica obrađuje sve detalje protokola niske razine, omogućavajući nam da se usredotočimo na druge aspekte našeg rješenja, a istovremeno ostavljamo dobar prostor za prilagodbu važnih aspekata njegovih unutarnjih značajki, poput postojanosti poruka.

Kôd prikazan u ovom članku dostupan je na GitHub-u.