Uvod u Netflix Mantis

1. Pregled

U ovom ćemo članku pogledati platformu Mantis koju je razvio Netflix.

Istražit ćemo glavne koncepte Mantisa stvaranjem, pokretanjem i istraživanjem posla obrade streama.

2. Što je Mantis?

Mantis je platforma za izgradnju aplikacija za obradu toka (poslovi). Pruža jednostavan način za upravljati raspoređivanjem i životnim ciklusom poslova. Štoviše, to olakšava raspodjelu resursa, otkrivanje i komunikaciju između ovih poslova.

Stoga se programeri mogu usredotočiti na stvarnu poslovnu logiku, a cijelo vrijeme imaju podršku a robusna i skalabilna platforma za pokretanje njihovih aplikacija s velikim volumenom, malim kašnjenjem i neblokiranjem.

Posao Mantisa sastoji se od tri različita dijela:

  • the izvor, odgovoran za dohvaćanje podataka iz vanjskog izvora
  • jedan ili više faze, odgovoran za obradu dolaznih tokova događaja
  • i a umivaonik koja prikuplja obrađene podatke

Istražimo sada svakog od njih.

3. Postavljanje i ovisnosti

Krenimo dodavanjem bogomoljka-vrijeme izvođenja i jackson-databind ovisnosti:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databind 

Sada, za postavljanje izvora podataka našeg posla, primijenimo Mantis Izvor sučelje:

javna klasa RandomLogSource implementira Izvor {@Override public Observable poziv (kontekst konteksta, indeks indeksa) {return Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } private String createRandomLogEvent (Duga oznaka) {// generiranje slučajnog niza unosa dnevnika ...}}

Kao što vidimo, jednostavno generira slučajne unose u zapisnik više puta u sekundi.

4. Naš prvi posao

Stvorimo sada posao Mantisa koji jednostavno prikuplja zapisnike iz našeg RandomLogSource. Kasnije ćemo dodati grupne i agregacijske transformacije za složeniji i zanimljiviji rezultat.

Za početak izradimo LogEvent entitet:

javna klasa LogEvent implementira JsonType {private Long index; razina privatnog niza; privatna string poruka; // ...}

Zatim, dodajte svoje TransformLogStage.

To je jednostavna faza koja implementira sučelje ScalarComputation i razdvaja unos dnevnika za izgradnju LogEvent. Također, filtrira sve pogrešno formatirane nizove:

javna klasa TransformLogStage implementira ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). karta (LogEvent :: novo); }}

4.1. Pokretanje posla

U ovom trenutku imamo dovoljno gradivnih elemenata za sastavljanje posla Mantisa:

javna klasa LogCollectingJob proširuje MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), new ScalarToScalar.Config (Sin) Sinbeks. LogEvent :: toJsonString))) .metadata (novi Metadata.Builder (). Build ()) .create (); }}

Pogledajmo izbliza naš posao.

Kao što vidimo, proteže se MantisJobProvider. Isprva dohvaća podatke iz našeg RandomLogSource i primjenjuje TransformLogStage do dohvaćenih podataka. Konačno, obrađene podatke šalje ugrađenom sudoperu koji se željno pretplaćuje i isporučuje podatke putem SSE-a.

Ajmo sada konfigurirati naš posao za lokalno izvršavanje pri pokretanju:

@SpringBootApplication javna klasa MantisApplication implementira CommandLineRunner {// ... @Override public void run (String ... args) {LocalJobExecutorNetworked.execute (new LogCollectingJob (). GetJobInstance ()); }}

Pokrenimo aplikaciju. Vidjet ćemo poruku zapisnika poput:

... Posluživanje suvremenog sudopera HTTP SSE poslužitelja na priključku: 86XX

Spojimo se sada na sudoper pomoću kovrča:

$ curl localhost: 86XX podaci: {"index": 86, "level": "WARN", "message": "pokušaj prijave"} podaci: {"index": 87, "level": "ERROR", "message ":" user created "} podaci: {" index ": 88," level ":" INFO "," message ":" user created "} podaci: {" index ": 89," level ":" INFO ", "message": "pokušaj prijave"} podaci: {"index": 90, "level": "INFO", "message": "user created"} podaci: {"index": 91, "level": "ERROR "," message ":" user created "} podaci: {" index ": 92," level ":" WARN "," message ":" pokušaj prijave "} podaci: {" index ": 93," level ": "INFO", "message": "korisnik stvorio"} ...

4.2. Konfiguriranje sudopera

Do sada smo koristili ugrađeni sudoper za prikupljanje obrađenih podataka. Da vidimo da li našem scenariju možemo dodati veću fleksibilnost pružanjem prilagođenog sudopera.

Što ako, na primjer, želimo filtrirati zapisnike po poruka?

Stvorimo a LogSink koja provodi Umivaonik sučelje:

javna klasa LogSink implementira Sink {@Override javni void poziv (Kontekst konteksta, PortRequest portRequest, Observable logEventObservable) {SelfDocumentingSink sink = new ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicate (filterByLedByLoredByLoading ; logEventObservable.subscribe (); sink.call (context, portRequest, logEventObservable); } private Predicate filterByLogMessage () {return new Predicate ("filtriraj po poruci", parametri -> {if (parametri! = null && parameters.containsKey ("filter")) {return logEvent -> logEvent.getMessage (). sadrži ( parameters.get ("filter"). get (0));} return logEvent -> true;}); }}

U ovoj implementaciji sudopera konfigurirali smo predikat koji koristi filtar parametar za dohvaćanje samo dnevnika koji sadrže tekst postavljen u filtar parametar:

$ curl localhost: 8874? filter = podaci za prijavu: {"index": 93, "level": "ERROR", "message": "pokušaj prijave"} podaci: {"index": 95, "level": "INFO "," message ":" pokušaj prijave "} podaci: {" index ": 97," level ":" ERROR "," message ":" pokušaj prijave "} ...

Napomena Mantis također nudi moćan jezik za upite, MQL, koji se može koristiti za postavljanje upita, transformiranje i analizu podataka o streamu na SQL način.

5. Lanciranje pozornice

Pretpostavimo sada da nas zanima koliko POGREŠKA, UPOZORITI, ili INFO zapise dnevnika koje imamo u zadanom vremenskom intervalu. Zbog toga ćemo svom poslu dodati još dvije faze i povezati ih.

5.1. Grupiranje

Prvo, stvorimo a GroupLogStage.

Ova je faza ToGroupComputation provedba koja prima a LogEvent struji podatke s postojećih TransformLogStage. Nakon toga grupira unose prema razini prijavljivanja i šalje ih u sljedeću fazu:

javna klasa GroupLogStage implementira ToGroupComputation {@Override public Observable poziv (kontekst konteksta, vidljivi logEvent) {return logEvent.map (log -> new MantisGroup (log.getLevel (), log)); } public static ScalarToGroup.Config config () {return new ScalarToGroup.Config () .description ("Group data data by level") .codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

Također smo kreirali prilagođenu konfiguraciju faze davanjem opisa, kodeka koji se koristi za serializaciju izlaza, i omogućili istodobno pokretanje metode poziva ove faze pomoću concurrentInput ().

Treba napomenuti da je ova faza horizontalno skalabilna. Što znači da možemo pokrenuti onoliko primjeraka ove faze koliko je potrebno. Također vrijedi spomenuti, kada su raspoređeni u Mantisovom klasteru, ova faza šalje podatke u sljedeću fazu, tako da će svi događaji koji pripadaju određenoj skupini sletjeti na istog radnika u sljedećoj fazi.

5.2. Agregirajući

Prije nego što krenemo dalje i kreiramo sljedeću fazu, prvo dodajte a LogAggregate entitet:

javna klasa LogAggregate implementira JsonType {private final Integer count; privatna završna razina niza; }

Ajmo sada stvoriti posljednju fazu u lancu.

Ova faza provodi GroupToScalarComputation i pretvara tok grupa dnevnika u skalar LogAggregate. To čini brojeći koliko se puta svaka vrsta dnevnika pojavljuje u toku. Osim toga, ima i LogAggregationDuration parametar, koji se može koristiti za kontrolu veličine prozora za agregiranje:

javna klasa CountLogStage implementira GroupToScalarComputation {private int duration; @Override public void init (Context context) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @Override public Observable call (Kontekst konteksta, Observable mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (group -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> novi LogAggregate (count, group.getKey ())))); } public static GroupToScalar.Config config () {return new GroupToScalar.Config () .description ("zbroj događaja za razinu zapisnika") .codec (JacksonCodecs.pojo (LogAggregate.class)) .withParameters (getParameters ()); } javni statični popis getParameters () {Popis params = novi ArrayList (); params.add (novi IntParameter () .name ("LogAggregationDuration") .description ("veličina prozora za agregiranje u milisekundama") .validator (Validators.range (100, 10000)) .defaultValue (5000) .build ()); povratni parametri; }}

5.3. Konfigurirajte i pokrenite posao

Jedino što preostaje sada je konfigurirati naš posao:

javna klasa LogAggregationJob proširuje MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), TransformLogStage.stageConfig ()) GroupStage (New Group). )) .stage (new CountLogStage (), CountLogStage.config ()) .sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (new Metadata.Builder (). build ()) .create (); }}

Čim pokrenemo aplikaciju i izvršimo svoj novi posao, možemo vidjeti kako se brojevi dnevnika dohvaćaju svakih nekoliko sekundi:

$ curl localhost: 8133 podaci: {"count": 3, "level": "ERROR"} podaci: {"count": 13, "level": "INFO"} podaci: {"count": 4, "level ":" WARN "} podaci: {" count ": 8," level ":" ERROR "} podaci: {" count ": 5," level ":" INFO "} podaci: {" count ": 7," razina ":" UPOZORENJE}} ...

6. Zaključak

Da rezimiramo, u ovom smo članku vidjeli što je Netflix Mantis i za što se može koristiti. Nadalje, pogledali smo glavne koncepte, koristili ih za stvaranje poslova i istražili prilagođene konfiguracije za različite scenarije.

Kao i uvijek, cjeloviti kôd dostupan je na GitHub-u.


$config[zx-auto] not found$config[zx-overlay] not found