Completable Future’s

De introductie van Completable Future’s in Java SE 8 is een belangrijke toevoeging aan de gereedschapskist van een ontwikkelaar. Met deze nieuwe API wordt het mogelijk om relatief eenvoudig single-threaded, synchrone en blocking code te transformeren naar multi-threaded, non-blocking en asynchroon. Dit levert belangrijke voordelen op, op het gebied van schaalbaarheid.

Wat de toepassing van deze nieuwe API betekent, onderzoeken we door een bestaande sequentiële REST-resource om te zetten naar deze nieuwe API.

 

Context

De REST-resource is CustomerOverview. Deze resource levert een aggregatie van klantinformatie op uit drie achterliggende resources. De CustomerOverview resource haalt eerst klantinformatie op, waarna op basis van het unieke klantnummer de bijbehorende contracten en communicatiemiddelen van de klant worden opgehaald.

 

 

Current state of affairs

De CustomerOverview-resource roept sequentieel de achterliggende resources aan en combineert de resultaten. In onderstaande voorbeeld is dit uitgewerkt met de Java API for RESTful Web Services (JAX-RS) uit Java EE 7.


@GET @Path("{username}") @Produces(APPLICATION_JSON)
public CustomerOverview retrieve(
   @PathParam("username") String username) {
 
  Customer customer = getCustomerInfo(username);
  auditLog(customer);
 
  Contract[] contracts = getContracts(customer);
  Communication[] communications = getCommunications(customer);
 
  return createOverview(customer, contracts, communications);
}
 
private Customer getCustomerInfo(String username) {
   // JAX-RS 2.0 Client API
  return ClientBuilder.newClient().target("http://localhost/:9998")
     .path("customers").path(username)
     .request().get(Customer.class);
}

 

 

Nadeel van deze implementatie is dat de HTTP-thread die het request afhandelt het grootste gedeelte van zijn tijd staat te wachten op resultaten. Elke aanroep van een backend-resource is immers een blocking call. Dit heeft een negatieve impact op de schaalbaarheid.

 

Wachten is ‘waste’, al zijn wij als mensen gewend om te wachten. Wij staan zes maanden van ons leven te wachten in rijen. Bedenk eens wat voor zinvolle dingen we kunnen doen in al die tijd! Met dit in het achterhoofd, begint mijn onderzoek naar de reductie van wachttijden.

 

Futures

Parallellisatie van het ophalen van contract- en communicatiemiddelen kan de responsetijd van de resource sterk verbeteren. De java.util.concurrent.Future (uit JDK 1.5!) maakte een meer asynchroon programmeermodel mogelijk, waarmee je taken parallel kunt uitvoeren. Future<T> betekent dat de waarde van type T beschikbaar komt op een bepaald moment in de toekomst. Een Future is een placeholder voor een toekomstige waarde.

 

Met de in Java EE 7 geïntroduceerde ManagedExecutorService kun je taken asynchroon afhandelen op een binnen de applicatieserver geconfigureerde threadpool. Java 8 Lambda’s zorgen voor een compacte en elegante manier om de taak, het aanroepen van de resource, asynchroon op te starten.

 


@Resource ManagedExecutorService executor;
 
private Future<Contract[]> getContracts(Customer customer) {
 return executor.submit(
     () -> ClientBuilder.newClient().target("http://localhost/:9998")
         .path("contracts").path(customer.id)
         .request().get(Contract[].class)
 );
}

 

 

Door deze implementatie kun je parallel de contracten en communicatiemiddelen ophalen. De methode Future.get() levert de toekomstige waarde op, maar is (potentieel) oneindig blocking. Dit is slecht voor de responsiviteit van de resource.

 


Future<Contract[]> contracts = getContracts(customer);
Future<Communication[]> communications = getCommunications(customer);
 
return createOverview(customer, contracts.get(1, SECONDS),
                                                       communications.get(1, SECONDS));

 

 

Een betere aanpak is het opgeven van de maximale wachttijd op het resultaat. De resource kan nu tegelijkertijd twee bronnen bevragen, maar de HTTP-thread blocked op het moment dat de get() wordt aangeroepen.

 

Het nadeel van blocking blijkt duidelijk uit onderstaande figuur, waarin de sequentiële en future variant met elkaar worden vergeleken. In de future-variant blocked de HTTP-thread (zwarte blokjes) voor het combineren van de resultaten. Pas als de langstdurende Future een waarde krijgt, kunnen de waarden gecombineerd en geretourneerd worden en wordt de HTTP-thread vrijgegeven.

 

Voor het optimaal gebruik van de beschikbare resources, is het belangrijk om non-blocking te zijn. Dit is met Futures niet mogelijk. Daarnaast kun je met Futures niet uitdrukken welke functionaliteit uitgevoerd moet worden als de Future éénmaal een waarde krijgt.

 

Het registreren van functionaliteit dat uitgevoerd moet worden als een waarde beschikbaar komt,  is de essentie van Reactive programming. Java SE 8 biedt hiervoor Completable Futures (a.k.a. deferred object of promise in andere talen). Zo kun je non-blocking services maken.

 

Reactive Programming and Completable Future

“Reactive programming is a programming paradigm oriented around data flows and the propagation of change. This means that it should be possible to express static or dynamic data flows with ease in the programming languages used, and that the underlying execution model will automatically propagate changes through the data flow.” http://en.wikipedia.org/wiki/Reactive_programming

 

Reactive programming werkt als een spreadsheet waarin in een bepaalde cel een formule voorkomt als ‘=B1+B2’. Als één van de waarden in B1 of B2 verandert, verandert ook de waarde van de cel waarin de formule staat. Belangrijk verschil is dat reactive programming meer aandacht heeft voor operaties die data transformeren en hoe wijzigingen worden doorgegeven in een keten van taken.

 

CompletableFuture:  A Future that may be explicitly completed and may trigger actions upon its completion.

 

Een Completable Future biedt de mogelijkheid om de functionaliteit als een keten van in de toekomst uit te voeren taken te modelleren. De programmeur definieert wat het systeem moet doen als een bepaald event optreedt en welke functionaliteit je dan moet uitvoeren. Dit kan zijn dat de completable future gereed is of dat er een exceptie is opgetreden.

 

Een van de mogelijke patronen is ‘chaining’, waarbij het event is dat de CompletableFuture een waarde heeft gekregen. Aan de methode thenApply kun je een functie meegeven die de waarde uit de CompletableFuture transformeert als die een waarde krijgt.

 

LET OP: thenApply levert zelf ook een CompletableFuture op. Je definieert immers een pijplijn van toekomstige acties, dus deze methode levert ook een placeholder op voor de toekomstige, getransformeerde waarde.


public <U> CompletableFuture<U> thenApply   (Function<? super T,? extends U> fn)

 

In onderstaande afbeelding staat de workflow van de CustomerOverview-resource.

 

Completable Futures definiëren een pijplijn van operaties/transformaties die uitgevoerd moeten worden. In onderstaand voorbeeld wordt met een completedFuture() een nieuwe CompletableFuture aangemaakt. Deze heeft dus al een waarde, maar de eigenlijke functie is om een trigger te zijn voor de andere acties die je daarna moet uitvoeren.

 

De volgende actie is het ophalen van de klantinformatie. Deze taak moet je op een aparte thread uitvoeren en levert dus zelf ook een CompletableFuture op. Hier is thenApply niet van toepassing aangezien het resultaattype dan een CompletableFuture<CompletableFuture<Customer>> zou zijn. De methode thenCompose is het equivalent is van de flatMap-operatie uit Scala.

 

Als klantoverzicht een waarde heeft gekregen, dan kun je vervolgens de method-reference naar de getContracts-methode gebruiken om de contracten op te halen. Dit moet wederom op een andere thread uit de thread-pool.

 


CompletableFuture<Customer> customerFuture = completedFuture(username)
       .thenComposeAsync(this::getCustomerInfo, executor);
 
CompletableFuture<Contract[]> contractFuture =
   customerFuture.thenComposeAsync(this::getContracts, executor);

 

 

De implementatie van het ophalen van data uit achterliggende resources wijzigt. De methode getContracts() gaat geen Future meer opleveren, maar een CompletableFuture van het juiste type. De bestaande implementatie blocked nog de thread door zijn gebruik van de synchrone JAX-RS API.

 

De in JAX-RS 2.0 geïntroduceerde asynchrone API biedt hier de oplossing. Dit betekent dat, nadat het request is verstuurd, de thread wordt vrijgegeven en dat de response door het onderliggende framework op een andere thread wordt ontvangen. Voor Glassfish is dit de thread pool die binnen Jersey is gedefinieerd. Het gedrag dat dan uitgevoerd moet worden, kun je door middel van een callback registreren.

 

De CompletableFuture wordt pas gecompleteerd in de callback-implementatie. Dit betekent ook dat de acties, die geregistreerd zijn op het compleet worden van de Completabe Future, op dat moment worden uitgevoerd.

 


CompletableFuture<Contract[]> getContracts(Customer customer) {
 
 CompletableFuture<Contract[]> cf = new CompletableFuture<>();
 
 backendServices.path("contracts").path(customer.id)
     .request().async()
     .get(new InvocationCallback<Contract[]>() {
       @Override public void completed(Contract[] contracts) {
         cf.complete(contracts);
       }
 
       @Override public void failed(Throwable throwable) {
         cf.completeExceptionally(throwable);
       }
     });
 return cf;
}

 

 

Het laatste stuk van de keten is het combineren van de verschillende CompletableFutures tot een volledig ingevuld klantoverzicht dat je richting de client retourneert.


customerFuture
   .thenApplyAsync(CustomerOverview::new, executor)
   .thenCombineAsync(contractFuture, CustomerOverview::add, executor)
   .thenCombineAsync(commFuture, CustomerOverview::add, executor)
   .whenCompleteAsync( (overview, throwable) -> {
         boolean b =
        (overview == null) ? response.resume(throwable) : response.resume(overview);
       }
   );

 

 

Bovenstaande code kent nog één probleem. Het is een keten van taken die in de toekomst tot een resultaat gaat leiden. De http-thread moet echter niet wachten (blocken) tot dat moment.

 

JAX-RS 2.0 ondersteunt asynchrone verwerking van HTTP-request. De asynchrone verwerking levert een beter en meer efficiënt gebruik van threads. Als de thread het einde van de resource-methode bereikt, wordt de HTTP-thread vrijgegeven. De HTTP-connectie met de client blijft echter openstaan. Met de toevoeging van de parameter @Suspended ASyncResponse wordt dit aangegeven. Hierdoor kun je, als de resource-methode al is geëindigd, toch nog  de response versturen door middel van response.resume(c). Na de resume wordt de HTTP- connectie daadwerkelijk gesloten.

 


@GET @Path("{username}") @Produces(APPLICATION_JSON)
public void retrieve(
   @Suspended AsyncResponse response,
   @PathParam("username") String username) {

 

 

In onderstaand diagram is te zien hoe de verschillende delen van de functionaliteit worden uitgevoerd. De ‘witte’ stukken geven aan dat de thread is vrijgegeven en dat de response op een andere thread wordt afgehandeld. Uit dit diagram blijkt duidelijk dat de implementatie veel minder blocking is, dan in het eerste sequentiële voorbeeld het geval was.

 

 

Als we de initiële code vergelijken met de code met Completable Future’s blijkt duidelijk dat er een trade-off aanwezig is tussen de eenvoud van het synchrone programmeermodel en de schaalbaarheid van het asynchrone model.

 

Conclusie

Door gebruik te maken van een reactive programming style, in combinatie met Completable Future en asynchroon JAX-RS, kun je een bestaande resource asynchroon en non-blocking maken. Dit maakt de resource meer schaalbaard. Toepassing van een uitgestelde response in combinatie met een time-out houdt in dat de resource meer responsive blijft. Er zit een harde bovengrens aan de tijd, waarbinnen de client een response krijgt.

 

Met reactive programming, Completable Future’s en Java EE 7 is het wachten voorbij! Het maakt applicaties mogelijk die meer responsive zijn en beter schalen.

 

Links

  • Code Traditionele service: https://github.com/martijnblankestijn/reactive-rest-demo
  • Code Oplossing: https://github.com/martijnblankestijn/reactive-rest-demo/tree/solution
  • JAX-RS: https://jcp.org/en/jsr/detail?id=339
  • JSON-P: https://jcp.org/en/jsr/detail?id=353
  • Method-reference: http://docs.oracle.com/javase/tutorial/java/javaOO/methodreferences.html
  • Reactive Manifesto: http://www.reactivemanifesto.org/