CQRS & Event Sourcing met Lagom

Lagom is het nieuwe microservices framework van Lightbend (voorheen Typesafe, het bedrijf achter Scala en Akka). Het framework en de concepten daarachter zijn in grote mate gebaseerd op CQRS (Command Query Responsibility Segregation) en ES (Event Sourcing). Dit bepaalt onder andere hoe intern state wordt bijgehouden en gepersisteerd. In dit artikel zal ik de basis van Lagom beschrijven en daarna verder ingaan op CQRS en ES in combinatie met het framework.

Lagom, het Framework

De filosofie achter Lagom is dat het:

  • Gedistribueerd moet zijn;
  • Communicatie asynchroon moet verlopen;
  • Hoge productiviteit moet ondersteunen.

Deze ideeën bepalen hoe het framework is opgezet. Het doel is om services die zijn ontwikkeld bovenop Lagom klein en compact te houden. Conventies maken het vervolgens heel eenvoudig de services op een asynchrone manier met elkaar te laten communiceren. Om hier maar meteen in listing 1 een voorbeeld van te tonen:

 


ServiceCall<CreateCustomerMessage, Done> createCustomer();
ServiceCall<NotUsed, Customer> getCustomerByEmail(String email);
ServiceCall<NotUsed, String> getCustomerAverageAge();
 
@Override
default Descriptor descriptor() {
   return named("customer-store").withCalls(
           pathCall("/api/customer/average-age", this::getCustomerAverageAge),
           restCall(Method.POST, "/api/customer", this::createCustomer),
           restCall(Method.GET, "/api/customer/:email", this::getCustomerByEmail)
   ).withAutoAcl(true).withCircuitBreaker(CircuitBreaker.perNode());
}

Listing 1

 

Hier worden drie interfaces gedefinieerd. Doordat de “getCustomerAverageAge” een “ServiceCall” is met “NotUsed” als eerste generics parameter, maakt dit automatisch een HTTP GET request. Een “ServiceCall” met als eerste parameter een object en als tweede het type “Done”, maakt dit automatisch een POST (ook al zou het type niet expliciet meegegeven zijn met de “restCall” aanroep). Het is dus met minimale code mogelijk om RESTful interfaces te definiëren, welke intern op een asynchrone manier worden afgehandeld.

Naast CQRS en ES zijn ook enkele andere belangrijke concepten toegepast, waaronder immutability van objecten, design-driven API’s en polyglot programming. Zowel Java als Scala worden ondersteund door de framework API’s, maar door gebruik te maken van RESTful API’s met JSON data, kun je ook eenvoudig communiceren met andere services.

Omdat het Lagom framework van Lightbend afkomstig is, zal de technologie waarop het is gebaseerd niet verbazen. Akka, met daarbij Akka Streams, Akka Persistence en Akka Cluster vormen het fundament en verzorgen communicatie en data opslag. Play is ingezet ten behoeve van de RESTful interfaces en voor onder andere de configuratie. Slick als ORM framework waarbij SQL calls asynchroon worden afgehandeld. ConductR ten slotte om het geheel eenvoudig en schaalbaar in productie te kunnen deployen.

Andere in het oog springende libraries die gebruikt worden, zijn Logback (logging), Jackson (JSON serialization), Guice (dependency injection), Dropwizard (metrics) and Immutables (immutable objects).

De focus op immutability, non-blocking API’s en de sterke aanwezigheid van de concepten van CQRS en Event Sourcing maken het grootste verschil met een framework als Spring Boot. Bovendien is Lagom een veel compacter framework; libraries voor bijvoorbeeld queueing systemen als ActiveMQ zul je zelf toe moeten voegen en moeten configureren. Over het algemeen schermt Lagom je goed af voor de onderliggende frameworks, maar voor dit soort zaken is kennis van de onderliggende frameworks haast onontbeerlijk.

Persistence in Lagom

Voor persistency wordt standaard gebruik gemaakt van de Cassandra key-value store. Sinds Lagom versie 1.2 is het ook mogelijk een JDBC store te gebruiken. De principes daarvan zijn grotendeels gelijk. We zullen hier later nog verder op ingaan.

Opslaan van data gaat via het implementeren van de “PersistentEntity” abstract class (een voorbeeld staat verderop in listing 4). Een “PersistentEntity” komt overeen met een Aggregate Root vanuit Domain Driven Design concepten. Iedere “PersistentEntity” heeft een stabiele identifier (primary key) waarmee de huidige status kan worden opgevraagd en er wordt altijd één instantie (als een “singleton”) in het geheugen vastgehouden. Dit in tegenstelling tot bijvoorbeeld JPA, waarvan meerdere instanties met dezelfde identifier in het geheugen kunnen bestaan. Daarnaast wordt over het algemeen bij JPA alleen de huidige status opgeslagen, terwijl een “PersistentEntity” in Lagom ook alle historie bevat van events die hebben geleid tot de huidige status.

Overeenkomstig de CQRS ‘flow’ dien je een “PersistentEntity” te koppelen aan een “Command”, “Event” en “State”. Alle interactie verloopt door “Command”s te versturen naar de entity, waarop een antwoord volgt dat ofwel een update is uitgevoerd, dan wel met de gevraagde data. Ook het opvragen van de huidige status loopt dus via “Command”s.

In geval van een wijziging, leidt een “Command” tot een “Event” welke wordt gepersisteerd. Vanuit het “Event” wordt ook de huidige “State” bijgewerkt.

Afbeelding 1: CQRS Command, Event, State flow

 

Zie listing 2 voor een voorbeeld “Command” voor het toevoegen van een klant.

 


public interface CustomerCommand extends Jsonable {
 
   @Immutable
   @JsonDeserialize
   public final class AddCustomer implements CustomerCommand, CompressedJsonable, PersistentEntity.ReplyType<Done> {
       public final String firstName;
       public final String lastName;
       public final Date birthDate;
       public final Optional<String> comment;
 
       @JsonCreator
       public AddCustomer(String firstName, String lastName, Date birthDate, Optional<String> comment) {
           this.firstName = Preconditions.checkNotNull(firstName, "firstName");
           this.lastName = Preconditions.checkNotNull(lastName, "lastName");
           this.birthDate = Preconditions.checkNotNull(birthDate, "birthDate");
           this.comment = Preconditions.checkNotNull(comment, "comment");
       }
   }
 
}

Listing 2

 

Zoals getoond in listing 3 kun je vervolgens een “Command” aanmaken en versturen naar de entity.

 


@Override
public ServiceCall<CreateCustomerMessage, Done> createCustomer() {
   return request -> {
       log.info("===> Create or update customer {}", request.toString());
       PersistentEntityRef<CustomerCommand> ref = persistentEntityRegistry.refFor(CustomerEntity.class, request.userEmail);
       return ref.ask(new CustomerCommand.AddCustomer(request.firstName, request.lastName, request.birthDate, request.comment));
   };
}

Listing 3

 

Dit is de implementatie van de service uit de eerste source code listing (1). Zoals te zien is in de listing, wordt een “PersistentEntityRef” opgevraagd aan de hand van een type en de identity / primary key. Deze reference is een instantie waarmee je kunt interacteren d.m.v. het versturen van “Command”s. De “CreateCustomerMessage” implementatie (hier niet getoond) is vergelijkbaar met de “AddCustomer” implementatie uit listing 2, maar bevat daarnaast ook het email-adres van de gebruiker als primary key.

Om “Command”s te kunnen verwerken dien je binnen een “PersistentEntity” zogenaamde ‘Command Handlers’ te definiëren. Deze bepalen het “Behavior” van je “PersistentEntity” en beginnen altijd vanuit een lege “State”. Zie listing 4 voor onze “CustomerEntity”.

 


public class CustomerEntity extends PersistentEntity<CustomerCommand, CustomerEvent, CustomerState> {
 
   @Override
   public Behavior initialBehavior(Optional<CustomerState> snapshotState) {
 
      /*
       * De BehaviorBuilder start altijd met een State, deze kan initieel leeg zijn.
       */
       BehaviorBuilder b = newBehaviorBuilder(
               snapshotState.orElse(new CustomerState.EMPTY));
 
      /*
       * Command handler voor de AddCustomer command.
       */
       b.setCommandHandler(CustomerCommand.AddCustomer.class, (cmd, ctx) ->
               // Eerst creëren we een event welke we persisteren.
               // {@code entityId() } geeft je automatisch de ‘primary key’, in ons geval email
               ctx.thenPersist(new CustomerEvent.AddedCustomerEvent(entityId(), cmd.firstName, cmd.lastName, cmd.birthDate, cmd.comment),
                       // Als dit geslaagd is geven we “Done” als antwoord.
                       evt -> ctx.reply(Done.getInstance())));
 
      /*
       * Event handler voor het AddedCustomerEvent event, waarin we daadwerkelijk de status bijwerken
       */
       b.setEventHandler(CustomerEvent.AddedCustomerEvent.class,
               evt -> {
                   return new CustomerState(Optional.of(evt.email), Optional.of(evt.firstName), Optional.of(evt.lastName), Optional.of(evt
                           .birthDate), evt.comment);
               });
 
      /*
       * Command handler om alle informatie van een klant op te vragen (String representatie van onze klant)
       */
       b.setReadOnlyCommandHandler(CustomerCommand.CustomerInfo.class,
               (cmd, ctx) -> ctx.reply(state().toString()));
 
       return b.build();
   }
 
}

Listing 4

 

Onderaan de code in listing 4 zie je ook een ‘read only command handler’, waarin niets gemuteerd kan worden, maar wel informatie kan worden opgevraagd vanuit de entity.

De “BehaviorBuilder” kan ook logica bevatten, bijvoorbeeld om de state op een andere manier te muteren, wanneer deze klant al bestaat.

Het “AddedCustomerEvent” is identiek aan de “AddCustomerCommand” behalve dat deze ook nog het email-adres bevat. Wat dan nog ontbreekt uit de code listings is de “CustomerState”. Deze velden zijn allen van het type “Optional”, omdat de initiële status van een klant ‘leeg’ is, zie hiervoor listing 5.

 


public final class CustomerState implements Jsonable {
 
   public static final CustomerState EMPTY = new CustomerState(Optional.empty(), Optional.empty, Optional.empty, Optional.empty, Optional.empty);
 
   private final Optional<String> email;
   private final Optional<String> firstName;
   private final Optional<String> lastName;
   private final Optional<Date> birthDate;
   private final Optional<String> comment;
 
   @JsonCreator
   public BlogState(Optional<String> email, Optional<String> firstName, Optional<String> lastName, Optional<Date> birthDate, Optional<String> comment) {
       this.email = email;
       this.firstName = firstName;
       this.lastName = lastName;
       this.birthDate = birthDate;
       this.comment = comment;
   }
 
   @JsonIgnore
   public boolean isEmpty() {
       return !email.isPresent();
   }
}

Listing 5

 

Read-side met JDBC in Lagom

In een CQRS (Command Query Responsibility Segregation) architectuur wordt het manipuleren van data gescheiden van het lezen van data. Eén van de interessante aspecten daarvan is dat je de lees-zijde kunt optimaliseren voor het bevragen. Specifiek door aan de lees-zijde gedenormaliseerde tabellen te gebruiken. Hierbij wordt data het meest efficiënt gegroepeerd en indien nodig gedupliceerd, waardoor je queries simpel en snel kunt houden.

Bovendien voorkomt dit zogenaamde ORM impedantie mismatch; de discrepantie tussen object structuren en relationele tabellen, zoals bijvoorbeeld ‘inheritance’ en ‘encapsulation’.

Waar Lagom -zoals we hierboven gezien hebben- al automatisch zorgt voor de opslag en verwerking van events, zo ondersteunt het framework ook om voor de lees-zijde data gedenormaliseerd op te slaan. Zie afbeelding 2.

Afbeelding 2: Gescheiden ‘read’ en ‘write’ kant volgens CQRS © Microsoft – CQRS Journey

 

Binnen Lagom kun je zogenaamde “ReadSideProcessor”s definiëren die ook events kunnen ontvangen en verwerken en zodoende de data in andere vorm op kunnen slaan. Een voorbeeld “ReadSideProcessor” kun je vinden in listing 6.

 


public class CustomerEventProcessor extends ReadSideProcessor<CustomerEvent> {
 
   private final JdbcReadSide readSide;
 
   @Inject
   public CustomerEventProcessor(JdbcReadSide readSide) {
       this.readSide = readSide;
   }
 
   @Override
   public ReadSideHandler<CustomerEvent> buildHandler() {
       JdbcReadSide.ReadSideHandlerBuilder<CustomerEvent> builder = readSide.builder("votesoffset");
 
       builder.setGlobalPrepare(this::createTable);
       builder.setEventHandler(CustomerEvent.AddedCustomerEvent.class, this::processCustomerAdded);
 
       return builder.build();
   }
 
   private void createTable(Connection connection) throws SQLException {
       connection.prepareStatement(
               "CREATE TABLE IF NOT EXISTS customers ( "
                       + "id MEDIUMINT NOT NULL AUTO_INCREMENT, "
                       + "email VARCHAR(64) NOT NULL, "
                       + "firstname VARCHAR(64) NOT NULL, "
                       + "lastname VARCHAR(64) NOT NULL, "
                       + "birthdate DATETIME NOT NULL, "
                       + "comment VARCHAR(256), "
                       + "dt_created DATETIME DEFAULT CURRENT_TIMESTAMP, "
                       + " PRIMARY KEY (id))").execute();
   }
 
   private void processCustomerAdded(Connection connection, CustomerEvent.AddedCustomerEvent event) throws SQLException {
       PreparedStatement statement = connection.prepareStatement(
               "INSERT INTO customers (email, firstname, lastname, birthdate, comment) VALUES (?, ?, ?, ?, ?)");
       statement.setString(1, event.email);
       statement.setString(2, event.firstName);
       statement.setString(3, event.lastName);
       statement.setDate(4, event.birthDate);
       statement.setString(5, event.comment.orElse(""));
       statement.execute();
   }
 
   @Override
   public PSequence<AggregateEventTag<CustomerEvent>> aggregateTags() {
       return TreePVector.singleton(CustomerEvent.CUSTOMER_EVENT_TAG);
   }
}

Listing 6

 

Nu kun je de “ReadSideProcessor” registreren in de service implementatie (voor de volledigheid de hele constructor) zoals in listing 7.

 


@Inject
public CustomerServiceImpl(PersistentEntityRegistry persistentEntityRegistry, JdbcSession jdbcSession, ReadSide readSide) {
   this.persistentEntityRegistry = persistentEntityRegistry;
   this.persistentEntityRegistry.register(CustomerEntity.class);
   this.jdbcSession = jdbcSession;
   readSide.register(CustomerEventProcessor.class);
}

Listing 7

 

Ook moet je voor de “Event” class de volgende ‘tag’ definiëren, zoals in listing 8. Hierdoor kan Lagom bijhouden welke events verwerkt zijn. Dit is voornamelijk bij crashes of herstarten van belang, zodat de data consistent gehouden kan worden met de status aan de schrijf-zijde.

 


AggregateEventTag<CustomerEvent> CUSTOMER_EVENT_TAG = AggregateEventTag.of(CustomerEvent.class);
 
@Override
default AggregateEventTag<CustomerEvent> aggregateTag() {
   return CUSTOMER_EVENT_TAG;
}

Listing 8

 

Nu de verwerking van events klaar is en data gedenormaliseerd opgeslagen wordt, kun je de data simpel opvragen met SQL queries. Dit doe je bijvoorbeeld vanuit de service implementatie. Het voorbeeld in listing 9 toont een simpele query om de gemiddelde leeftijd van je klanten op te vragen.

 


@Override
public ServiceCall<NotUsed, String> getCustomerAverageAge() {
   return request -> jdbcSession.withConnection(connection -> {
       ResultSet rsCount = connection.prepareStatement("SELECT COUNT(*) FROM customers").executeQuery();
       ResultSet rsAverage = connection.prepareStatement("SELECT AVG(TIMESTAMPDIFF(YEAR,birthDate,CURDATE())) FROM customers").executeQuery();
 
       if (rsCount.next() && rsAverage.next() && rsCount.getInt(1) > 0) {
           return String.format("# %s customers resulted in average age; %s", rsCount.getString(1), rsAverage.getString(1));
       } else {
           return "No customers yet";
       }
   });
}

Listing 9

 

Conclusie

CQRS en Event Sourcing zijn een goede manier om de schrijf- en lees-zijde van een service los te optimaliseren. En hoewel een NoSQL store zeker zijn voordelen heeft, is een relationele database zeer geschikt om queries over meerdere objecten op te stellen. Ik heb je laten zien dat Lagom perfecte ondersteuning biedt voor deze architectuur en daarbij verschillende persistence oplossingen ondersteunt. Door het principe van ‘convention over configuration’ kun je je op de business logica richten in plaats van alle boilerplate code.

Lagom is pas bij versie 1.2.x aanbeland en de jonge leeftijd van het framework is af en toe merkbaar in kleine issues. Mede daardoor raad ik je aan nog voorzichtig te zijn met het gebruik van Lagom voor productie applicaties. Maar het is zeker een framework om in de gaten te houden.