Lambda Expressies: Enhance your Collections

Het kan je bijna niet ontgaan zijn. Java 8 brengt een nieuwe feature: ‘Lambda expressies’. Bovenop dit fundament komt nog iets extra’s: verbeteringen in de Collections API en de introductie van de nieuwe Streams API. In dit artikel wordt het allemaal onder de loep genomen.

In het persbericht van Oracle waarin Java 8 werd aangekondigd[i], werden ook drie voordelen genoemd van lambda expressies. Dit zijn: een hogere developer productivity, beter gebruik van multi-core CPU’s en de mogelijkheid om bulkdata in het Java Collections Framework efficiënt te verwerken. In dit artikel kijken we met name naar dat laatste.

Wanneer je als programmeur de Collection API in Java zou gebruiken om grote hoeveelheden data te verwerken, dan loop je al snel tegen beperkingen aan. Een belangrijke beperking is het kleine aantal mogelijkheden om operaties uit te voeren op je dataset die het niveau van toevoegen, verwijderen en sorteren ontstijgen. Libraries als Guava of Commons Collections brengen daar wel enige verbetering in, maar het geheel is nog lang niet zo flexibel als bijvoorbeeld een SQL-query uitvoeren op een database. Bovendien zorgt het gebruik van zulke libraries ervoor dat je flink meer klassen nodig hebt, wat de leesbaarheid vaak niet ten goede komt. Wil je bijvoorbeeld met Commons Collections een Collection van Person objecten doorzoeken (de medewerkers van een bedrijf) op vrouwen, dan levert dat Codevoorbeeld 1 op.


CollectionsUtils.filter(orders, new Predicate<Person>() { \
@Override
public boolean apply(Person person) {
return person.getGender() == Gender.FEMALE;
}
});]

Codevoorbeeld 1: Filteren van een Collection met Commons Collections

In dit voorbeeld zien we de introductie van een nieuwe, anonieme klasse. De JVM zal een instantie van deze klasse aanmaken en op de heap plaatsen, met de nodige overhead in geheugen daarbij. Deze klasse is overigens niet los te unit testen, maar alleen door de omliggende klasse te unit testen. Wat moet er gebeuren wanneer we het gemiddelde salaris van deze vrouwelijke medewerkers willen vinden? Dan moet niet alleen een klasse gemaakt worden die de filtering verzorgt, maar ook één die elke persoon mapt op haar salaris én een klasse die het gemiddelde berekent van meerdere salarissen. Op deze manier hebben we al drie klassen zelf geïntroduceerd, die vrijwel geen logica bevatten. Bovendien zijn er 431 extra klassen uit Commons Collections in onze applicatie geladen. En dat voor zo’n eenvoudig probleem…

Lambda expressies brengen daar verandering in. Met een paar lambda expressies kan het allemaal een stuk eenvoudiger, zoals in Codevoorbeeld 2.


employees
.stream()
.filter(p -> p.getGender() == Gender.FEMALE)
.mapToInt(p -> p.getSalary())
.average().getAsDouble();]

Codevoorbeeld 2: Gemiddeld salaris vrouwelijke medewerkers

Op deze manier worden geen Commons Collections of Guava gebruikt en zijn er geen nieuwe klassen geïntroduceerd. En, wat misschien wel minstens zo belangrijk is: dit is een stuk leesbaarder! Het doet qua duidelijkheid nauwelijks onder voor een SQL-query. Een aardige vergelijking tussen lambda expressies in Collections en SQL is te lezen in een blog-artikel van Lucas Jellema.

Streams
De stream() methode geeft een instantie van java.util.stream.Stream terug. Dit object is een potentieel oneindige stroom van elementen, die door een operatie of een iteratie geconsumeerd kunnen worden. Na consumptie is het element uit de stream verdwenen en kan het niet opnieuw geconsumeerd worden. Op zo’n stream kan een pipeline van operaties opgebouwd worden. Er zijn twee soorten operaties. De eerste zijn intermediate operaties zoals filter, map of sorted, die een stream consumeren en zelf ook weer een stream opleveren. De tweede soort zijn terminating operaties zoals count, sum, max, reduce of findFirst, die een stream consumeren maar iets anders opleveren, zoals bijvoorbeeld een object, een Map of zelfs niets.

Standaard zijn deze operaties ‘lazy’, dat wil zeggen dat ze pas uitgevoerd worden als het nodig is en ook alleen zo vaak als nodig. Als de terminal operator bijvoorbeeld findFirst() is, dan zullen de intermediate operaties net zo vaak uitgevoerd worden totdat het eerste element uit de stream dat voldoet aan de findFirst()-conditie, verwerkt is. De findFirst() terminal operatie wordt ook wel een blocking operatie genoemd, omdat deze het doorstromen van de stream blokkeert.

Een bijzonder geval van intermediate operaties zijn operaties die dezelfde stream opleveren als ze ontvangen. Een voorbeeld is de peek() methode. Door deze aan te roepen kun je als het ware ‘in de stream kijken’. Elk element uit de stream dat voorbij komt, kan bijvoorbeeld worden geprint naar System.out, om op deze manier het debuggen te vergemakkelijken.

Zoals eerder gezegd, kan een stream door een pipeline van operaties verwerkt worden. Dat betekent dat operaties achter elkaar geplaatst kunnen worden. Een voorbeeld hiervan is te zien in Codevoorbeeld 3, waar we de gemiddelde leeftijd per geslacht bepalen van de medewerkers die meer dan € 50.000 verdienen. Dat doen we door de medewerkers te streamen. Deze stream filteren we eerst op salaris. Daarna groeperen we hem op geslacht en beide groepen worden door een Collector verzameld. In dit geval gebruiken we de averagingInt collector op basis van een method reference die een int teruggeeft.


employees
.stream()
.filter((p) -> p.getSalary() > 50000)
.collect(groupingBy(Person::getGender,
averagingInt(Person::getAge)));]

Codevoorbeeld 3: Meerdere operaties in een pipeline

Streams worden intern beschreven met flags. Als ontwikkelaar hoef je hier niet zelf iets mee te doen. Omdat deze flags helpen om de verwerking van de operaties te optimaliseren, is het toch wel nuttig om te begrijpen hoe ze werken. Als een stream bijvoorbeeld de vlag SIZED heeft, is bekend hoeveel elementen in de stream aanwezig zijn. De terminal operatie toArray() kan daar gebruik van maken door een array te alloceren die precies groot genoeg is, in plaats van de array een aantal keer te moeten resizen. Operaties kunnen ook flags toevoegen of juist verwijderen. De map() operatie verwijdert bijvoorbeeld DISTINCT, als die aanwezig was. Twee elementen uit de input kunnen immers op dezelfde output-waarde gemapt worden. En de sorted() operatie voegt een vlag SORTED toe aan de output stream.

Enhance your Collections
Maar, vraagt een immer oplettende Javaan, waar komt die stream() methode nou eigenlijk vandaan? Java heeft altijd de backward compatibility hoog in het vaandel gehad. Een methode toevoegen op een interface die al sinds Java 1.2 bestaat, kan niet zomaar. Bestaande implementaties zouden al vanaf Java 8 niet meer compileerbaar zijn. Om dat te ondervangen, is het concept van de default method geïntroduceerd, min of meer vergelijkbaar met extension methods in C#. Het idee is, dat een Java interface een default implementatie kan geven voor een methode die in de interface gedefinieerd is. Implementaties van die interface die de default method niet zelf implementeren, krijgen de default implementatie cadeau. Een voorbeeld hiervan is te zien in Codevoorbeeld 4.


interface A {
default void foo() { System.out.println("Calling A.foo()"); }
}
public class DefaultMethods implements A {
public static void main(String... args) { new DefaultMethods().
foo(); }
}

Codevoorbeeld 4 Default methods in de praktijk

Parallel processing|
Een andere beperking van de Java Collections API is dat het verwerken van grotere datasets zeer slecht schaalt. Dat komt doordat geen ondersteuning is voor parallelle verwerking. Sinds Java 7 bestaat het Fork/Join Framework, maar die heeft een vrij laag-niveau API. Het gevolg hiervan is dat je als ontwikkelaar nog steeds veel moeite moet doen om de acht of zestien cores in je computer te gebruiken.

In Java 8 komt daar eindelijk verandering in: in plaats van stream() is er namelijk ook parallelStream() op de Collection interface. Deze geeft een stream die in parallel verwerkt mag worden. Verdere aanpassingen aan de code zijn niet nodig! Op runtime zal -waar nodig- je stream geforkt en gejoind worden (onder water wordt ook het Fork/Join Framework gebruikt) zonder dat je daar zelf over hoeft na te denken. Let wel op, want dit gebeurt niet vanzelf. Alleen wanneer je expliciet parallelStream() gebruikt, dan zal de runtime daadwerkelijk je dataset opsplitsen.

Stel nu dat één van de operaties in de verwerking van een stream met 1000 elementen een dure operatie is, die gemiddeld ongeveer 250 ms duurt. Denk hierbij aan een database lookup of het ophalen van een resource over HTTP. Dan is het verschil tussen sequentiele en parallele verwerking heel duidelijk te zien. Sequentieel verwerken duurt ongeveer 50 seconden, terwijl de parallele verwerking bij acht threads een seconde of zeven duurt. Standaard zal het Fork/Join Framework evenveel threads starten als je cores in je computer hebt, in dit voorbeeld acht. Dat is overigens te overschrijven door het zetten van de system property java.util.concurrent.ForkJoinPool.common.parallelism met het aantal gewenste threads.

Een ander voorbeeld: het optellen van de getallen van één tot één miljoen. In Diagram 1 is te zien hoe deze getallen in parallel bij elkaar opgeteld worden. In het diagram is daarnaast te zien dat de originele stream telkens gehalveerd wordt totdat het gewenste aantal threads bereikt is. Als alle streams verwerkt zijn, moet het resultaat natuurlijk weer gecombineerd worden. In het geval van de sum() operatie is dat natuurlijk eenvoudig: tel simpelweg de deelresultaten bij elkaar op.

Ook al lijkt dit met één miljoen elementen een grote set, de praktijk is toch dat het opsplitsen in deelverzamelingen ervoor zorgt dat dit probleem ongeveer vier keer langzamer wordt. De verklaring hiervoor is te vinden in de overhead die nodig is om te bepalen hoe groot de deelverzamelingen moeten zijn, de threads aan te maken en de resultaten te mergen. Omdat het eigenlijke probleem (het optellen van getallen) eigenlijk niet zo complex is, weegt het parallel uitvoeren ervan niet op tegen deze overhead. Bij complexere bewerkingen kan het parallel verwerken wel degelijk snelheidswinst betekenen. Zoals bij veel performance-gerelateerde problemen geldt ook hier: ‘When in doubt, measure!’

Het parallel verwerken van informatie is niet altijd een silver bullet. Bedenk wel dat bij deze afweging Amdahl’s wet nog altijd van kracht is. Deze wet beschrijft de maximaal te verwachten snelheidswinst van een systeem, wanneer slechts een deel van het systeem wordt verbeterd. Deze verbetering is namelijk beperkt door dat deel van het systeem dat nog wel sequentieel uitgevoerd wordt. Neem bijvoorbeeld een stuk code dat 20 uur duurt wanneer het op één processor gedraaid wordt. Een onderdeel daarvan kan niet parallel uitgevoerd worden en duurt één uur. Dan kan alleen de resterende 19 uur parallel uitgevoerd worden. Los van het aantal parallelle processen dat hiervoor ingezet wordt, duurt het totale proces nog altijd ten minste één uur; de maximale snelheidswinst is dus een factor 20.

Kortom, met de toevoeging van lambda expressies komt er niet alleen een nieuwe taalconstructie bij. Er ontstaan ook mogelijkheden om nieuwe API’s te gebruiken die de leesbaarheid van je code flink verbeteren, minder code vereisen en bovendien beter performen bij grotere load. De uitbreidingen in de Collections API en de nieuwe Streams API zijn daar goede voorbeelden van.

Om zelf met lambda expressies aan de slag te gaan, is het voldoende om een Java 8 JDK te downloaden. Wat IDE’s betreft, ondersteunt Netbeans Java 8 vanaf versie 7.4, IntelliJ vanaf versie 12.1 en Eclipse alleen in een release candidate[iii].

Referenties
http://bit.ly/1az08wL
http://bit.ly/1mzPGfS
http://bit.ly/1n5wraL