Het geheim achter schaalbaarheid (deel 2)

Complexiteitstheorie kan helpen bij het analyseren en voorspellen van de snelheid van een applicatie. Het voorgaande artikel in het afgelopen Java Magazine introduceerde complexiteitstheorie voor een enkele core. Tegenwoordig hebben CPU’s echter meerdere cores. En het draaien van software op verschillende computers tegelijk (gedistribueerd programmeren) is ook steeds gebruikelijker. De complexiteit van een algoritme zegt niks over hoeveel cores of servers tegelijkertijd gebruikt kunnen worden. In dit artikel worden enkele theoretische modellen beschreven die wat inzicht geven in de snelheid op meerdere cores. Voor Java 8 is dit een speerpunt. De voorbeelden in dit artikel maken dan ook gebruik van sommige van de nieuwe features in Java 8.

Concurrency en Speedup
Als een applicatie gebruik wil maken van meerdere cores op een CPU, dan zal die applicatie een aantal taken tegelijkertijd moeten kunnen doen. Dit is meteen een belangrijke eigenschap van een algoritme: de concurrency (C) van een algoritme is het maximaal aantal taken dat parallel uitgevoerd kan worden. Of dat dan ook daadwerkelijk parallel gedaan wordt, hangt natuurlijk af van hoeveel cores (of servers) in het systeem zitten. Grofweg kan men zeggen dat als P processors (cores of servers) aanwezig zijn, en de concurrency is groter of gelijk aan dat aantal processoren (C >= P), dan zou de executietijd ook moeten afnemen met die factor P. Immers, er kan nu met P processoren tegelijk aan gerekend worden.

 

Dat is meteen een mooie metriek: hoeveel sneller wordt het systeem als er processoren aan worden toegevoegd. De speedup (S) vergelijkt dus de tijd die nodig is op P processoren met het best mogelijke resultaat op een enkele processor. Het algoritme dat het snelst is op een enkele processor hoeft niet het snelst te zijn als meerdere processoren gebruikt worden. Als de concurrency groot genoeg is en er verder geen overhead is, dan geldt het volgende:

 

S(P) = T(1) / T(P) = P, als C>=P.

Execution graph
Hoe kan de concurrency van een algoritme of programma bepaald worden? Een handig hulpmiddel is het maken van een execution graph. Deel de applicatie op in taken en teken de onderlinge afhankelijkheden. Als een taak meerdere, opvolgende taken heeft die parallel uitgevoerd kunnen worden, dan wordt dat vaak een fork genoemd. Uiteindelijk worden de resultaten van taken samengevoegd en komen ze bij elkaar in een nieuwe taak. Dat wordt dan een join genoemd.

Het onderstaande voorbeeld heeft een start taak die twee onafhankelijke taken start, read document en generate statistics. De taak read document roept vervolgens de taak parse document aan. Als beide taken klaar zijn, vormen ze de input voor score document.  De concurrency van deze applicatie is dus twee: er kunnen maximaal twee dingen tegelijkertijd uitgevoerd worden. Meer dan twee processoren heeft dus geen zin: deze zullen niet gebruikt worden.

 


Afbeelding 1

Die taken kunnen in Java 8 vrij simpel beschreven en gecomponeerd worden. Een handige nieuwe interface daarvoor is de CompletableFuture. Een Future is als het ware een verwijzing naar een resultaat in de toekomst. Het nieuwe Completable gedeelte is waarmee de compositie gemaakt wordt. Bijvoorbeeld: als het document ingelezen is, dan moet hij geparsed worden. In Java 8 wordt een startpunt van de execution graph geïmplementeerd door een Supplier  (generateStatistics en readDocument) en de tussenstappen zijn dan Function’s (parseDocument, scorer).

 


CompletableFuture<Statistics> statistics =        CompletableFuture.supplyAsync(generateStatistics);
CompletableFuture<String> read =        CompletableFuture.supplyAsync(readDocument);
CompletableFuture<Document> parsed =         read.thenApply(parseDocument);
CompletableFuture<Integer> scored =   parsed.thenCombineAsync(statistics, scorer);
int score = scored.get();

 

Listing 1

 

In het vorige artikel in deze serie werd een voorbeeldprogramma gebruikt: PigsInSpace. Hierbij was het doel om botsingen te detecteren tussen ruimteschepen. Hierbij werd het onderstaande sequentiële algoritme gebruikt.

 


for (int i=0; i<ships.size(); i++) {
            for (int j=i+1; j<ships.size(); j++) {
                        Ship a = ships.get(i);
                        Ship b = ships.get(j);
                        if (a.collidesWith(b)) {
                                   a.bounce();
                                   b.bounce();
                        }
            }
}

 

Listing 2

 

Bovenstaand algoritme heeft concurrency 1: nergens worden taken parallel uitgevoerd. Processoren toevoegen aan het systeem heeft dus geen enkel effect op dit algoritme. Een voor de hand liggende manier om dit algoritme wel te parallelliseren, is om voor elk schip een eigen taak te maken. De stream API van Java 8 maakt dit makkelijk:

 

 


ships.parallelStream()
            .filter( s -> collidesWith(s,ships) )
            .forEach(s -> s.bounce());

 

Listing 3

 

 

Wat hier staat, is dat elk schip in parallel gefilterd wordt. Als een schip voldoet aan het filter (collidesWith) dan willen we deze behouden. Uiteindelijk worden alle schepen die hebben voldaan aan het filter teruggeduwd (bounce).

 

Dit algoritme verzet meer werk dan het originele algoritme: daar waar het originele sequentiële algoritme voorkomt dat een vergelijking tussen A en B meerdere keren gedaan wordt, doet de streaming variant dat niet. A wordt met B vergeleken en B met A. Dit is grofweg twee keer zoveel werk (met wat uitbreidingen is dat wel weer terug te verdienen, maar dat is buiten de scope van dit artikel).

De execution graph ziet er vervolgens ongeveer als volgt uit:

 


Afbeelding 2

Voor elk schip wordt een taak aangemaakt die eventueel opgevolgd wordt door een bounce. De concurrency is dus in dit geval even groot als het aantal schepen. In potentie kunnen met dit algoritme evenveel cores ingezet worden als het aantal schepen.  Aangezien hier meer werk per taak verricht moet worden, zal het algoritme pas sneller worden wanneer meer dan twee processoren worden gebruikt. In de praktijk komt dit vaak voor. Om werk efficiënt te kunnen verdelen, is meestal extra communicatie of dubbel werk nodig. Over het algemeen komt hier wat overhead bij.

Sterke en zwakke schaalbaarheid
Hoever kun je precies gaan met processoren toevoegen? Uit het bovenstaande blijkt al dat dit afhangt van de concurrency van het algoritme. Meer processoren toevoegen dan de maximale concurrency heeft geen zin. De maximale versnelling (speedup) die behaald kan worden, is ook gelijk aan het aantal processoren. Meer dan een factor P (processoren) is niet mogelijk. Als dat wel zo zou zijn, dan is er ook een sneller algoritme te bedenken voor P=1 door bijvoorbeeld virtual machines te draaien. Deze moeten dan gebruikt worden voor de speedup berekening alvorens cores worden bijgeschakeld. (In de praktijk kan het wel degelijk voorkomen dat je een superlineaire versnelling ziet, dus groter dan P. Dat komt omdat je met een extra processor vaak ook een extra cache geheugen toevoegt.)

In het beste geval kan er dus maximaal een factor P gehaald worden, als het algoritme een hoge concurrency heeft. Maar dat is het hoogst haalbare. Wat is realistisch?  Gene Amdahl[i] definieerde in 1967 zijn bekende wet. Hij stelde dat als een hoeveelheid werk uit twee stukken bestaat: een stuk dat oneindig parallelliseerbaar is (C=∞) en een tweede stuk dat sequentieel is (C=1), dan wordt het steeds inefficiënter om processoren toe te voegen. Het sequentiële stuk blijft altijd evenveel tijd kosten en wordt verhoudingsgewijs steeds duurder. De volgende tabel laat zien hoe de totale verwerkingstijd zich ontwikkelt met steeds meer processoren beschikbaar voor de uitvoering van een taak:

PTijd C=1Tijd C=∞Tijd totaal
110s10s20s
210s5s15s
510s1s11s
10s0s10s

 

Als de applicatie ergens een stuk heeft met een beperkte concurrency, dan bestaat een limiet aan hoe snel de applicatie kan worden. Een bekende analogie is: een vrouw is negen maanden zwanger. Negen vrouwen zijn nog steeds negen maanden zwanger. En die analogie geeft meteen ook een sprankje hoop als je van kinderen houdt. Met negen vrouwen duurt het nog steeds negen maanden, maar dan heb je ook meteen negen kinderen. Gustafson[ii] beschrijft dit in zijn scaled speedup formule. In dezelfde hoeveelheid tijd kun je met meer processoren meer werk verzetten. Deze conditie is een stuk makkelijker te realiseren.

 

Als de applicatie in staat is om dezelfde hoeveelheid werk sneller te doen met meer processoren, dan spreekt men over sterke schaalbaarheid. Dat is waar Amdahl het over heeft. Als de applicatie meer data kan verwerken in dezelfde hoeveelheid tijd door meer processoren te gebruiken (de negen vrouwen), dan spreekt men over zwakke schaalbaarheid. In netwerkterminologie wordt vaak gesproken over het reduceren van latency of vergroten van bandwidth; dit zijn vergelijkbare begrippen.

Schaalbaarheid en Iso-Efficiency
Speedup is een natuurlijke manier van het vergelijken van twee algoritmen. Het geeft een eenvoudig te begrijpen grafiek van de effecten van het toevoegen van extra processoren. Echter, zoals hierboven beschreven, geldt in het algemeen dat het effect van extra processoren steeds minder wordt. Deze efficiency is een andere veelgebruikte metriek: het bekijkt schaalbaarheid vanuit een economisch blikveld. Het stelt de vraag: is het nog kosteneffectief om extra servers te plaatsen? Efficiency is de extra snelheid die je realiseert per toegevoegde processor. Of anders gezegd: de hoeveelheid werk die je op een enkele core moet verzetten, afgezet tegen de hoeveelheid werk die je cumulatief moet verzetten als je meerdere processoren gebruikt.

E(P) = S(P) / P = T(1) / ( P * T(P) )

Een efficiency van 1 betekent dus lineaire schaalbaarheid, het theoretisch maximum. Voor de meeste applicaties is de efficiency kleiner dan 1. Dit is nog steeds een metriek die uitgaat van sterke schaalbaarheid. De aanname is dat je sneller een resultaat wilt. Als het doel is om méér werk te verzetten (meer klanten aan te sluiten bijvoorbeeld), dan moet je nog een stap verder gaan.

Intuïtief geldt dat een systeem zwak schaalbaar is als er simpelweg meer processoren bijgeplaatst kunnen worden, wanneer extra werk verzet moet worden. De verhouding tussen groei in data en groei in processoren is dan interessant om te weten. Iso-efficiency[iii] is bijvoorbeeld een metriek die dat beschrijft: hoeveel extra data is nodig om extra machines toe te voegen en toch de efficiency gelijk te houden. De iso-efficiency uitrekenen, is nog best lastig en het proberen uit te leggen aan de opdrachtgever is nog lastiger.

Gedistribueerd programmeren
Tot nu toe ging het over het toevoegen van processoren. Wanneer complete servers worden toegevoegd, dan gaat nog een aspect meespelen: het netwerk. Data benaderen over het netwerk is langzamer dan wanneer het lokaal in het geheugen staat. De latency, de tijd die het minimaal duurt voordat de data beschikbaar is, ongeacht de hoeveelheid data waar het om gaat, is grofweg te vergelijken met die van een SSD-schijf (zie onderstaande figuur).

Met het toevoegen van een server krijg je niet alleen extra rekenkracht, je krijgt ook meer werkgeheugen. Die voordelen moeten gebalanceerd worden tegen het nadeel van het tragere netwerk. In de onderstaande (logaritmische) grafiek kun je de verschillende reactietijden zien. DRAM geheugen is vele malen sneller dan een harde schijf. Maar als je om extra servers in te zetten communicatie over het netwerk nodig hebt, dan kan het voordeel van meer geheugen en meer rekenkracht weer snel teniet gedaan worden.

 


Afbeelding 3

Hoe kan de PigsInSpace applicatie gedistribueerd gemaakt worden? Alle ruimteschepen moeten met alle ruimteschepen vergeleken worden. Dat betekent dat goed nagedacht moet gaan worden welke data op welke server beschikbaar is.

Een simpele oplossing is om ervoor te zorgen dat alle data op alle servers beschikbaar is. Effectief betekent dit dat met het toevoegen van een server hier eigenlijk alleen maar extra rekenkracht beschikbaar komt. Elke beweging van een ruimteschip zal naar alle andere servers gekopieerd worden en dit gaat flink wat netwerkverkeer opleveren.

Een alternatieve aanpak is data verdelen over de servers. Dan hebben niet alle servers de beschikking over alle data, maar slechts enkele partities van die data. Daardoor is relatief meer geheugen beschikbaar en kan caching weer effectiever worden. De data in PigsInSpace bestaat uit een simpele lijst van schepen. Die zou je bijvoorbeeld volgens het round-robin algoritme kunnen verdelen over de verschillende servers.  Dit is geen handige manier van verdelen: om collission detection te doen, kan het zomaar zijn dat de dichtstbijzijnde schepen op een andere server staan. Het kan dus efficiënter zijn (maar zeker niet simpeler) om de positie van de schepen te gebruiken bij het verdelen over de verschillende servers, door bijvoorbeeld de kaart in stukken te verdelen en elk gebied (Tile) aan een server toe te wijzen. In het plaatje hieronder is de kaart verdeeld over verschillende servers. De achtergrond kleur van een Tile geeft aan op welke server hij draait. Een groot gedeelte van de collission detection kan dan lokaal plaatsvinden. Waar het lastig wordt, zijn de randen van elk gebied. Op deze punten zal een uitwisseling moeten plaatsvinden met een andere server.

 


Afbeelding 4: De kaart is verdeeld over verschillende servers: problemen op de overgangen

In dit simpele voorbeeld wordt snel duidelijk dat het lastig is om te voorspellen hoe schaalbaar dit precies is. Met name aan de randen van de gebieden is vrij veel communicatie nodig. Door een gebied groter te maken, worden de randen relatief kleiner -de oppervlakte neemt kwadratisch toe O(n^2) en de randen lineair O(n)- en neemt de communicatie tussen de gebieden af. Maar het aantal gebieden neemt dan ook af en daarmee eveneens de concurrency. Als niet alle gebieden even drukbevolkt zijn met ruimteschepen, dan is de kans ook groter dat niet alle servers evenveel werk te verrichten hebben. De optimale grootte van de gebieden hangt af van de hoeveelheid servers, maar ook van het aantal ruimteschepen, de beweeglijkheid van de ruimteschepen, hun verdeling en de grootte van de kaart. Als één van deze parameters gaat schuiven, dan zal die optimale grootte ook veranderen. Het devies is dan ook: maak dit zo flexibel mogelijk.

In Java 8 is veel support voor parallel programmeren erbij gekomen. Voor gedistribueerd programmeren, waarbij verschillende servers gebruikt worden, zijn maar beperkte mogelijkheden. Daarvoor is het aan te raden om één van de vele third party oplossingen te gebruiken. PigsInSpace maakt gebruik van de open source library Hazelcast. Zij leveren een gedistribueerde implementatie van de Java Collection Framework API, zoals een Map die op basis van de keys de data verdeeld over de verschillende servers.

Door elke Tile een uniek ID te geven en ze in een Hazelcast Map te stoppen, wordt de data automatisch gedistribueerd. Sommige van de values zijn dan lokaal beschikbaar, andere values worden bewaard op een andere server. Door alleen te opereren op lokale waarden (localKeySet()) kun je netwerkverkeer voorkomen. In PigsInSpace ziet dat er dan ongeveer zo uit:


class Tile {
    int tileId;
    Collection<Ship> ships;
}
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<Integer,Tile> tiles = instance.getMap("tiles");
for(;;) {
    Set<Integer> localKeySet = tiles.localKeySet();
    for (int tileId : localKeySet) {
        handleTile(tiles.get(tileId));
    }
}

 

Listing 4

Het hangt van het framework af hoe inzichtelijk het is wanneer data over het netwerk gaat. Bij Hazelcast bijvoorbeeld, kun je met hetzelfde gemak over alle keys in de Map lopen, inclusief de keys die op een andere server worden beheerd. Of dat een voordeel of een valkuil is, is iets om goed over na te denken.

Conclusie
We hebben het nog niet gehad over correctheid. Hoe zorg je ervoor dat wanneer alles tegelijkertijd gebeurd, je data niet corrumpeert? En wat als bijvoorbeeld een netwerkkabel het begeeft? Dat zijn onderwerpen voor een volgend artikel.

Schaalbaarheid is een lastig onderwerp. Een goed begin is om onderscheid te maken tussen sterke en zwakke schaalbaarheid. Voorspellingen zijn zeer applicatie- en gebruiksspecifiek. Algoritmen vergelijken is erg lastig. Er zijn wel metrieken zoals iso-efficiency die helpen, maar deze zijn lastig te gebruiken. Het beste is om zo flexibel mogelijk te zijn: beschrijf uit welke taken je applicatie bestaat en componeer die als een execution graph.  Als je meer dan één server wilt gebruiken, denk na over de verdeling van de data over die servers. Maak strategieën configurabel daar waar keuzes gemaakt moeten worden.

Mocht je zelf willen experimenteren met PigsInSpace, dan is de source code te downloaden op het GitHub account van First8: https://github.com/First8/pigsinspace