Reactive Programming

Reactive Programming is meer dan alleen een buzzword geworden en het feit dat Pivotal het Spring Framework Reactive gaat maken, zegt daarin meer dan genoeg. Omdat die omwenteling niet over één nacht ijs gaat, heeft Pivotal een eerste project omgezet naar het Reactive paradigma: Spring Web. Met de komst van Spring WebFlux, dat een GA krijgt in het tweede kwartaal van 2017, wordt Reactive Programming belangrijker dan ooit. In dit artikel willen we alvast een tipje van de sluier voor jullie oplichten op basis van onze ervaringen met de SNAPSHOTs en Milestones.

Waarom Reactive?

Als gebruikers zijn we ontzettend verwend. Door onze ervaringen met bijvoorbeeld Google, Facebook en Netflix verwachten we dat alle andere applicaties ook altijd beschikbaar zijn, goed werken, stabiel zijn en snel reageren. Zodra dat niet zo is, zijn we meedogenloos en gooien we zonder pardon die net geïnstalleerde app die niet aan onze (hoge) verwachtingen voldeed weer van onze smartphone. Op naar de volgende.

Dat betekent nogal wat voor developers zoals wij. Wij moeten namelijk aan die verwachtingen voldoen! Error handling, non-blocking (asynchrone) acties om de responsiveness te verbeteren, schaling mogelijk maken voor piekuren en componenten ontkoppelen om een betere werking van het hele systeem te kunnen bewerkstelligen. Dit zijn onderwerpen waar goed over moet worden nagedacht, wil je het van-de-telefoon-afgooien-scenario kunnen voorkomen. Helaas denken we er echter vaak te laat of helemaal niet aan.

Het reactive manifesto (1) zegt dat een systeem Reactive is als het responsive, resilient, elastic en message-driven is (zie Figuur 1). De positieve uitwerkingen van bovenstaande onderwerpen vormen dus de definitie van een Reactive systeem, maar hoe bereiken we dat?

 

 

Figuur 1

 

 

Reactive Programming

Reactive Systems worden over het algemeen gebouwd middels Reactive Programming. De meeste mensen, die al eens iets met Reactive Programming hebben gedaan, zullen bekend zijn met RxJava, de Reactive Extensions-implementatie (2) voor Java. Reactive Programming biedt een manier om data als een stroom te zien, waar je je op kunt abonneren. Je beschrijft declaratief wat je wilt dat er gebeurt op het moment dat een nieuw element uit die stroom komt. Het is eigenlijk een uitgebreidere vorm van het Observer Pattern (3), waarschijnlijk velen van jullie bekend.

We weten al hoe we synchroon data kunnen ophalen van een component, service of class: T getData() of Iterable<T> getData() als het om een collectie / meerdere elementen gaat. Ook hebben we al een manier om asynchroon data op te kunnen halen middels Future<T> getData() of nog beter CompletableFuture<T> getData(). Nu hebben we een manier om ons asynchroon te abonneren op meerdere elementen, die nu of ergens in de toekomst aangeboden kunnen worden. Doordat er verschillende implementaties zijn heeft de call meerdere vormen. Voor RxJava is dat Observable<T> getData(), voor de implementatie die we zo gaan bespreken (Project Reactor) is het Flux<T> getData().

Het mooie aan Reactive Programming is, dat je meer doet dan jezelf als luisteraar op een bron abonneren. Je kunt ingebouwde of zelf gedefinieerde operaties uitvoeren op de stroom aan data, om nieuwe stromen aan data te creëren of stromen combineren tot een nieuwe. Verder ondersteunt de implementatie out-of-the-box mogelijkheden om je systeem asynchroon (en daarmee non-blocking) te maken en biedt het handige manieren om te declareren wat er moet gebeuren in het geval van fouten, of in situaties waarin de ontvangende partij de verzender niet kan bijhouden. Dit laatste wordt ook wel aangeduid met de term Back-Pressure (4).

Iedereen die bekend is met Java 8 Streams, zal een hoop herkenningspunten vinden in het werken met Reactive Streams. Er zijn inderdaad overeenkomsten tussen de twee. Het grootste verschil zit hem echter in de asynchroniteit van een Reactive Stream: je weet niet óf en wanneer je de volgende elementen op een Stream gaat ontvangen.

 

Reactive Streams Specification

Gelukkig is er sinds april 2015 een officiële Reactive Streams-specificatie (5): een standaard, die geïnspireerd is op de eerdergenoemde Reactive Extensions API, om interoperabiliteit tussen verschillende implementaties te waarborgen.

De specificatie is technisch gezien weinig meer dan een beschrijving van vier interfaces: Publisher, Subscriber, Subscription en Processor (zie Listing 1). Drie daarvan zou je al verwachten bij iets dat verwant is aan het Observer Pattern, maar ook een nieuwe: Processor. Zoals je kunt zien in Figuur 2 extends Processor zowel van Publisher als Subscriber. Processor is de interface voor operaties zoals filter, die luisteren naar een stroom data en een nieuwe, gefilterde, opleveren.

 

 

public interface Publisher<T> {

   void subscribe(Subscriber<? super T> s);

}

 

public interface Subscriber<T> {

   void onSubscribe(Subscription s);

   void onNext(T t);

   void onError(Throwable t);

   void onComplete();

}

 

public interface Subscription {

   void request(long n);

   void cancel();

}

 

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}

Listing 1

 

 

Een Reactive Stream bestaat dus uit een keten, die begint met een Publisher en eindigt met een (of meerdere) Subscriber(s). Daartussen bevinden zich eventueel een aantal Processors (die zich abonneren op hun voorganger en publiceren naar de volgende subscribers). Doordat hier gebruik wordt gemaakt van het publish/subscribe-principe, is het geheel losgekoppeld en hoeven de onderdelen niet expliciet op elkaar te wachten (non-blocking). De volgende stap in de keten krijgt vanzelf een signaal wanneer er weer data beschikbaar is en kan vervolgens in actie komen.

Ondanks het feit dat de Reactive Streams-specificatie “maar” vier interfaces definieert, raden we je toch ten sterkste af om ze zelf te proberen te implementeren. De interfaces gaan gepaard met een beschrijving waaraan de interfaces moeten voldoen en er is een Technology Compatibility Kit (TCK), die de implementatie verifieert alvorens je het een juiste implementatie mag noemen. Laat het werk dus vooral aan de professionals over! Speaking of which…

 

Project Reactor

Project Reactor (6) is één van de implementaties van de Reactive Streams-specificatie. Het is ontwikkeld door Pivotal en is de kern van alles wat Reactive is binnen Spring. We noemen het ook niet voor niets, want Spring WebFlux maakt gebruik van Project Reactor. De laatste release van Project Reactor zit alweer op 3.0.5.RELEASE en is dus nu al bruikbaar voor je applicaties in productie. Voor een productiewaardige Spring WebFlux moeten we nog heel even geduldig zijn.

Binnen Project Reactor is er sprake van twee soorten “stromen”: de Flux en de Mono. Een Flux is een stream van elementen (meervoud) waar je je op abonneert (Zie Figuur 2) en een Mono is ook een stream, maar dan van maximaal één element (Zie Figuur 3). De Mono is er om een intentie aan te geven richting afnemers van deze stroom. Je zult altijd maar maximaal één element ontvangen.

 

 

Figuur 2

 

 

 

Figuur 3

 

 

Met Project Reactor kun je eenvoudig streams aanmaken. Flux<String> flux = Flux.just("Hello", "World!"); creëert bijvoorbeeld een stream, die de Strings “Hello” en “World!” publiceert. Ook zitten er uiteraard een aantal standaard operaties in Project Reactor, zie bijvoorbeeld Listing 2. Deze zul je herkennen, indien je bekend bent met de Java 8 Streams.

 

 

Flux.just(1, 2, 3, 4, 5).filter(n -> n % 2 == 0); // [2, 4]

Flux.just(1, 2, 3, 4, 5).map(i -> i * 2);         // [2, 4, 6, 8, 10]

Flux.just("a", "a", "b", "c").distinct();         // [a, b, c]

Listing 2

 

 

Goed om te weten is dat de voorbeelden in Listing 2 nog niets publiceren. Pas als een er sprake is van een subscription, dan zal de stream gaan “lopen”. Zie Listing 3.

 

 

Flux.just("Hello", " ", "World!").subscribe(System.out::print);

// Hello World!

Listing 3

 

 

Spring WebFlux

Met Spring WebFlux wordt er een brug gelegd tussen het ‘traditionele’ Spring Web MVC en de nieuwe ‘hippe’ wereld van Reactive Programming. Indien je reeds gewend bent aan Spring Web MVC, is de initiële overstap naar Spring WebFlux relatief eenvoudig, want aan de voorkant kun je nog steeds gebruikmaken van de bekende annotaties als @(Rest)Controller, @RequestMapping, enzovoorts.

Naast het annotation-based model wordt er in Spring 5 ook een functioneel programmeermodel geïntroduceerd (Router Functions). Deze aanpak zal vooral de mensen aanspreken, die geen fan zijn van de annotaties en meer gewend zijn om op een ‘low-level’ niveau request routers en handlers te schrijven, zoals bijvoorbeeld in Netty of Vert.x. Alhoewel het een interessante aanpak is, laten we deze in dit artikel verder buiten beschouwing. Voor de liefhebbers is er een blog-artikel geschreven, waarin deze nieuwe aanpak wordt toegelicht (7).

Alhoewel de voorkant grotendeels gelijk lijkt te zijn, is de achterkant van Spring WebFlux compleet herschreven ten opzichte van Spring Web MVC. Waar Spring Web MVC compatibel is met elke Servlet 2.x container, is Spring WebFlux gemaakt om te draaien op servers die asynchrone, non-blocking I/O ondersteunen, zoals Netty, Undertow, of een Servlet 3.1 container. Zie Figuur 4 voor een grafische weergave van de verschillen tussen Spring Web MVC (links), en Spring WebFlux (rechts).

 

 

Figuur 4

 

 

Zie Listing 4 voor een voorbeeldimplementatie van een Spring WebFlux @RestController. Deze REST endpoint geeft een lijst terug van de getallen 1 tot en met het getal dat is meegegeven als request parameter. De implementatie is grotendeels gelijk aan een ouderwetse Spring MVC controller, met het verschil dat er niet een List<Integer> wordt teruggegeven, maar een Flux<Integer> (normaal gesproken verkrijgen we deze Flux uiteraard uit een reactive datasource, maar voor het gemak wordt er in dit voorbeeld een Flux gecreëerd).

Op het eerste gezicht lijkt hier niet veel spannends te gebeuren. Op de achtergrond zit er echter een groot verschil tussen het teruggeven van een List of een Flux. In het geval van een List kan de response naar de client pas worden gecreëerd op het moment dat de volledige List uit de datasource is opgehaald. Een Flux werkt echter push-based, per element. Dit houdt in dat we al kunnen beginnen aan de response naar de client op het moment dat het eerste element binnenkomt uit de datasource. Op het moment dat een connectie met een client om bepaalde redenen wat trager is, zou hiermee ook het produceren van items uit de datasource kunnen worden gepauzeerd (back-pressure), totdat de client de verwerking weer aankan. Daarnaast zou de client het ontvangen van de items halverwege kunnen annuleren, waardoor er geen onnodige items uit de datasource worden opgevraagd.

 

 

@RestController

public class MyController {

   @GetMapping("count")

   public Flux<Integer> count(@RequestParam("to") int to) {

       return Flux.range(1, to);

   }

}

// GET http://localhost:8080/count?to=5 -> [1, 2, 3, 4, 5]

Listing 4

 

 

Ook de client side heeft een Reactive sausje gekregen. Waar we in Spring Web MVC met behulp van RestTemplate eenvoudig REST calls kunnen maken, kunnen we in Spring WebFlux gebruikmaken van WebClient. Het resultaat van de REST call is vervolgens te streamen als Mono of Flux. Zie Listing 5 voor een voorbeeld, waarin we de REST Controller uit Listing 4 aanroepen. We hebben hiermee een volledig reactive lijntje van backend naar frontend, push-based & asynchroon! Waarom hebben we dit niet altijd al zo gedaan?

 

 

WebClient client = WebClient.create("http://localhost:8080");

 

Flux<Integer> numbers = client.get()

       .uri("/count?to={to}", 5)

       .accept(MediaType.APPLICATION_JSON)

       .exchange()

       .flatMap(response -> response.bodyToFlux(Integer.class));

 

numbers.subscribe(System.out::println);

Listing 5

 

 

Conclusie

Reactive Programming wordt de laatste tijd steeds meer gemeengoed. Ook het Spring Framework is begonnen aan een transitie naar het Reactive paradigma, waarbij Spring WebFlux een leuke eerste stap is. Echter, om volledig Reactive te kunnen zijn, is het niet voldoende om een Reactive frontend te hebben. Daarom zal het volledige Spring ecosysteem (waar nodig) voorzien worden van Reactive componenten. Zo is er onlangs een Milestone release van Spring Data uitgegeven, met initiële support voor Reactive datasources met behulp van MongoDB, Apache Cassandra en Redis (8). Verder zullen bijvoorbeeld ook Spring Cloud en Spring Security Reactive support krijgen.

Indien je geïnteresseerd bent geraakt in Spring WebFlux, kijk dan vooral naar de Git-repository met workshop-materiaal (9), dat ontwikkeld is voor de Reactive Spring workshop op J-Fall 2016.

Met dank aan Arjen Poutsma van Pivotal voor het reviewen van dit artikel!

 


Links

(1) http://www.reactivemanifesto.org/ (2) http://reactivex.io/ (3) https://en.wikipedia.org/wiki/Observer_pattern (4) http://www.reactivemanifesto.org/glossary#Back-Pressure (5) http://www.reactive-streams.org/ (6) projectreactor.io (7) https://spring.io/blog/2016/09/22/new-in-spring-5-functional-web-framework (8) https://spring.io/blog/2016/11/28/going-reactive-with-spring-data (9) Workshop repository met voorbeeldcode: https://bitbucket.org/rlippolis/spring-web-reactive-workshop Documentatie Spring: http://docs.spring.io/spring-framework/docs/5.0.x/spring-framework-reference/html/web-reactive.html