Advanced Akka

Wellicht heb je al eens wat over Akka gehoord of gelezen. Misschien zelfs in een van de voorgaande Java Magazine artikelen over Akka die je inmiddels ook op de NLJUG website terug kunt vinden (links staan onderaan). Deze artikelen lieten al goed zien wat er mogelijk is met Akka, maar in dit artikel behandel ik nog een aantal andere interessante features.

Een simpele Actor

Het Actor model is eigenlijk een abstractie laag voor het schrijven van concurrent en gedistribueerde applicaties. Het zorgt ervoor dat je makkelijker systemen kunt bouwen die veel gebruikers aankunnen. De Worker Actor in voorbeeld 1 is een simpele Actor. Om een Actor te maken moet je Actor extenden en een receive methode definiëren. De receive methode bepaalt wat er gedaan moet worden met binnengekomen berichten. In dit voorbeeld wordt ieder bericht dat binnenkomt geprint op het scherm.



class Worker extends Actor {

  override def receive = {

    case message: String =>

      println(message)

  }

}

Voorbeeld 1: Simpele Actor

Om de Worker Actor te kunnen gebruiken moeten we eerst een applicatie maken. In voorbeeld 2 maken we een object ‘Startup’ en extenden we ‘App’. Actors draaien in een ActorSystem, dus we moeten eerst een ActorSystem aanmaken in onze applicatie. Vervolgens vragen we aan het ActorSystem een referentie naar de Worker Actor. We krijgen dus niet de Actor zelf in handen, maar een referentie naar de Actor. Daarna kunnen we met het uitroepteken een bericht naar de Actor sturen. De Worker Actor zal vervolgens de tekst “Hello Akka” printen. Dit is een simpel voorbeeld, in een echte applicatie stuur je geen Strings naar Actoren, maar gebruik je case classes.



object Startup extends App {

  implicit val system = ActorSystem("ExampleActorSystem")

  val workerActorRef = system.actorOf(Props[Worker])

  workerActorRef ! "Hello conference"

}

Voorbeeld 2: Simpele Actor aanroepen

Akka cluster

Bij een cluster start je een aantal nodes op, in dit geval nodes met de Worker applicatie. Als er vervolgens vanuit de Coördinator een bericht gestuurd wordt, dan wordt dat bericht door één van de Worker applicaties afgehandeld.

Een cluster kan gebruikt worden om de load te verdelen over meerdere fysieke machines en/of om backup nodes te hebben voor het geval dat één van de nodes niet meer werkt.

De tekst “router = round-robin-pool“ in de configuratie van voorbeeld 4 zorgt ervoor dat we een router hebben, een soort loadbalancer. Deze zorgt ervoor dat de berichten verdeeld worden op basis van round-robin. De ‘seed-nodes’ in de configuratie geven de contactpunten aan voor de nodes, alle nodes die gestart worden melden zich aan bij de seed nodes.

In voorbeeld 3 wordt scheduling gebruikt. De code ‘context.system.scheduler.schedule(5.seconds, 1.seconds, self, SendMessage)’ zorgt ervoor dat er na vijf seconden iedere seconde een bericht ‘SendMessage’ naar de eigen Actor (self) gestuurd wordt. Zoals je in de receive methode kunt zien wordt er voor ieder ‘SendMessage’ een bericht naar de router gestuurd. Dit bericht zal vervolgens naar een van de Workers gestuurd worden.

De Worker kan een bericht terugsturen naar de Coördinator. Daarvoor moet een bericht naar ‘sender’ gestuurd worden op de volgende manier: ‘sender ! [bericht]’.



class Coordinator extends Actor {
  import Coordinator._
  import context.dispatcher

  val router = context.actorOf(Worker.props.withRouter(FromConfig), 

"router")

val messageScheduler = context.system.scheduler.schedule(5.seconds,

1.seconds, self, SendMessage)

var messageCounter = 0

override def receive = {
case SendMessage =>
router ! "Test message " + messageCounter
messageCounter
+= 1
}
}

Voorbeeld 3: Coördinator van Akka cluster


akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"

// Dit stuk is alleen nodig in de coördinator applicatie
deployment {
/coordinator/router {
router = round-robin-pool
routees.paths = ["/user/printactor"] cluster {
enabled = on
allow-local-routees = off
}
}
}
}
remote {
enabled-transports = ["akka.remote.netty.tcp"] netty.tcp {
hostname = "127.0.0.1"
port = "2550"
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterActorSystem@127.0.0.1:2551",
"akka.tcp://ClusterActorSystem@127.0.0.1:2552"
] }
}

Voorbeeld 4: Configuratie voor Akka cluster

Akka cluster singleton

In sommige gevallen wil je niet dat een willekeurige Worker node het bericht afhandelt. Als je wilt dat een bepaalde Actor uniek is in het cluster, dan kun je een singleton Actor maken.

In voorbeeld 5 is te zien hoe een singleton Actor aangemaakt kan worden.



object Startup extends App {
  val actorSystem = ActorSystem.create("ClusterActorSystem")
 actorSystem.actorOf(ClusterSingletonManager.props(
    singletonProps = Worker.props,
    terminationMessage = PoisonPill,
    settings = ClusterSingletonManagerSettings(actorSystem)),
    name = "singletonactor")
}

Voorbeeld 5: Aanmaken singleton Actor

Het gebruiken van deze singleton Worker Actor kan eenvoudig met de code van voorbeeld 6. Deze code zorgt ervoor dat er wederom om de seconde een bericht wordt gestuurd, maar ditmaal naar een singleton Actor. Deze draait op het oudste ActorSystem in het cluster. Zolang deze goed draait zal hij alle berichten voor de Worker Actor afhandelen. Als er iets mis gaat, dan wordt de Actor op het op dat moment oudste ActorSystem geselecteerd als singleton. Deze Actor zal vervolgens alle berichten afhandelen.



class Coordinator extends Actor {
  import Coordinator._
  import context.dispatcher

  val singletonActor = context.actorOf(ClusterSingletonProxy.props(
    singletonManagerPath = "/user/singletonactor",
    settings = ClusterSingletonProxySettings(context.system)),

    name = "singletonactorProxy")

  val messageScheduler = context.system.scheduler.schedule(5.seconds, 

1.seconds, self, SendMessage)
var messageCounter = 0

override def receive = {
case SendMessage =>
singletonActor ! "Test message " + messageCounter
messageCounter
+= 1

}

}

Voorbeeld 6: Aanroepen singleton Actor

De configuratie lijkt op de configuratie van een normaal cluster, maar dan zonder het gedeelte voor de routing. Wat er nieuw is in deze configuratie is ‘auto-down-unreachable-after = 3s’. Dit zorgt ervoor dat als een van de nodes (Worker applicaties) niet bereikt kan worden gedurende 3 seconden, dan wordt er een andere Worker Actor (de langst bestaande) aangewezen als de singleton.

Finite State Machine (FSM)

Een FSM Actor is simpelweg een Actor met wat meer functionaliteit. De FSM Actor kun je bijvoorbeeld gebruiken om een registratieproces in te implementeren. Er zijn een aantal stappen in het registratieproces waar je naartoe kunt gaan en hopelijk eindig je in een stap waarin alle informatie compleet is. Het voordeel van een FSM is dat de stappen en toestanden duidelijk en makkelijk testbaar zijn.

In het voorbeeld zijn er drie toestanden namelijk ‘New Project’, ‘Inprogress Project’ en ‘Crappy Project’. Tussen deze toestanden zijn verschillende overgangen mogelijk.

 

 


sealed trait ProjectStatus
case object NewProject extends ProjectStatus
case object InProgressProject extends ProjectStatus
case object CrappyProject extends ProjectStatus

case object Progress
case object NoProgress

class SignalChangeFSMActor extends Actor with FSM[ProjectStatus, Int] {

startWith(NewProject, 0)

when(NewProject) {
case Event(Progress, 2) =>
println("Progress Event: Get another job and let someone else create a good application!")
stay
case Event(Progress, _) =>
goto(InProgressProject)
case Event(NoProgress, _) =>
println("NoProgress Event: Work harder!")
stay
}

when(InProgressProject) {
case Event(Progress, _) =>
goto(CrappyProject)
}

when(CrappyProject) {
case Event(Progress, iteration) =>
goto(NewProject) using (iteration + 1)
}

onTransition {
case NewProject -> InProgressProject => println("Transition: You're doing a good job!")
case InProgressProject -> CrappyProject => println("Transition: You're moving into the wrong direction.")
case CrappyProject -> NewProject => println("Transition: You should use Akka in your new project.")
}

initialize
}

Voorbeeld 7: FSM Actor

Als er een NoProgress bericht gestuurd wordt naar de Actor van voorbeeld 7 dan wordt de tekst ‘NoProgress Event: Work harder!’ getoond. Stuur je vervolgens een Progress bericht dan wordt de tekst ‘Transition: You're doing a good job!’ getoond. Het aantal iteraties door de verschillende projectstatussen wordt bijgehouden. Als je twee keer rond bent gegaan dan wordt de tekst ‘Progress Event: Get another job and let someone else create a good application!’ getoond.

Persistence

Met Akka kun je gemakkelijk berichten verwerken op een stateless manier. Als we echter stateful willen werken dan wordt het iets lastiger. Het is bijvoorbeeld mogelijk om een winkelwagen Actor te maken.  Alleen als de winkelwagen Actor crasht dan ben je de inhoud van je winkelwagen kwijt.

Stel bijvoorbeeld dat we drie berichten bijvoorbeeld “Cobol”, “Java” en “Scala” naar een Actor sturen. Vervolgens stopt of crasht de Actor na het “Java” bericht. Daarna start de Actor weer, maar het “Cobol” en “Java” bericht zijn weg. Vervolgens verwerkt de Actor het “Scala” bericht en dat is dan het enige dat de Actor nog heeft. Wellicht is het niet erg dat “Cobol” weg is, maar ik kan mij voorstellen dat sommigen “Java” nog wel even willen houden. Dat kan vrij makkelijk met Akka persistence, maar dat moet je dat wel zelf programmeren. Met Akka persistence kun je de data opslaan in een database naar keuze.

De volgende stappen moeten uitgevoerd worden om data op te slaan:

  1. Een “Cobol” command wordt naar de Persistent Actor gestuurd.

  2. De Actor ontvangt het command en stuurt een “Cobol” event naar de Journal.

  3. Het “Cobol” event wordt opgeslagen in de journal.

  4. De Actor krijgt een Acknowledge terug.

  5. De Actor slaat “Cobol” op in de interne Actor staat.

 

Dezelfde stappen worden uitgevoerd om “Java” en “Scala” op te slaan. Als de Actor tussendoor stopt of crasht dan worden eerst de events uit de journal opnieuw door de Actor verwerkt en vervolgens worden de nieuwe berichten die binnenkomen afgehandeld.

De code in voorbeeld 8 laat de PersistentActor zien. Als er een Command bericht binnenkomt dat wordt dat opgeslagen (persist). Als er een Print bericht binnenkomt wordt de staat van de Actor geprint; in dit geval een lijst met programmeertalen. Als een ThrowException bericht binnenkomt wordt er een Exception gegooid en stopt de Actor. Als een Actor opnieuw start worden eerst alle events uit de journal opnieuw verwerkt door middel van de receiveRecover methode. Daarna worden de nieuwe berichten die binnenkomen afgehandeld.

De applicatie in voorbeeld 9 stuurt eerst het “Cobol” en “Java” Command naar de Actor, beide talen worden in het Journal opgeslagen en in de state lijst in de Actor. Vervolgens stuurt de applicatie een ThrowException waardoor de Actor stopt en er wordt een nieuwe gestart. De state lijst in de Actor is leeg, maar door middel van de receiveRecover methode worden “Cobol” en “Java” uitgelezen uit het Journal en in de State lijst van de Actor geplaatst. Daarna stuurt de applicatie het “Scala” Command welke toegevoegd wordt aan het Journal en aan de state lijst in de Actor. Uiteindelijk zitten alle drie de programmeertalen “Cobol”, “Java” en “Scala” in de state lijst van de Actor, ondanks dat de Actor tussendoor gecrasht is.



object ExamplePersistentActor {

  case class Command(data: String)

  case class Event(data: String)

  case object ThrowException

  case object Print

}

class ExamplePersistentActor extends PersistentActor {

  override def persistenceId = "ExamplePersistentActor"

  var state = List.empty[String]

  def updateState(event: Event): Unit =

    state = state :+ event.data

  val receiveRecover: Receive = {

    case replayEvent: Event => updateState(replayEvent)

  }

  val receiveCommand: Receive = {

    case Command(data) =>

      persist(Event(data)) { event => updateState(event)}

    case Print =>

      println("Persisted " + state) 

    case ThrowException => throw new Exception("This exception was thrown 

intentionally")

}

}

Voorbeeld 8: Persistent Actor


 

object Startup extends App {
  implicit val system = ActorSystem("ExampleActorSystem")

  val persistentActor = system.actorOf(Props[ExamplePersistentActor])
  persistentActor ! Command("Cobol")
  persistentActor ! Command("Java")
  persistentActor ! ThrowException
  persistentActor ! Command("Scala")
  persistentActor ! Print
}

Voorbeeld 9: Aanroepen persistent Actor

Conclusie

Akka biedt vele mogelijkheden, in dit artikel heb ik er enkele genoemd, maar zeker niet alle. Wil je meer weten over Akka, lees dan eens de verschillende artikelen, blogs en boeken. Want het werken met Akka kan je leven een stuk makkelijker maken. Daarnaast kan het ervoor zorgen dat je applicaties een stuk stabieler zijn en beter schaalbaar zijn.

Leave a Reply

Your email address will not be published. Required fields are marked *