Real-time Hadoop – Inkomende data direct verwerken met Storm

De afgelopen jaren heeft de IT-industrie een ware revolutie gezien in dataverwerking. Technieken en frameworks zoals MapReduce, Hadoop en bepaalde NoSQL oplossingen hebben het mogelijk gemaakt om enorme hoeveelheden data te verwerken, iets wat zonder deze technieken wellicht onmogelijk zou zijn.

De industrie heeft hier dan ook duidelijk behoefte aan. EMC en Gartner hebben reeds meerdere malen gemeld dat de hoeveelheid data die de IT in de toekomst te verwerken krijgt, alleen maar zal toenemen. In een rapport van IDC wordt beweerd dat van 2005 tot 2020 een groei zal optreden van een factor 300; van 130 exabytes tot 40,000 exabytes.

De eerder genoemde technieken kunnen grote hoeveelheden data verwerken. Dit doen ze echter ’batchgewijs’, wat inhoud dat steeds een subset van de data wordt opgehaald uit een database en vervolgens wordt verwerkt. Daarna is de volgende subset weer aan de beurt. Deze technieken zijn per definitie niet real-time en zo zijn ze dan ook niet bedoeld. In het geval van batchverwerking gaat het namelijk vaak om data die wordt verzameld over een bepaalde periode en vervolgens op een bepaald moment moet worden verwerkt. Denk hierbij aan payroll- of factureringssystemen. Bij real-time verwerking zijn het vaak juist kleine hoeveelheden data die direct moeten worden verwerkt. Denk hierbij aan PIN of customer-service-systemen. Er is steeds meer behoefte aan systemen die overeenkomen met de eigenschappen van real-time data verwerking. Dit komt onder andere omdat gebruikers steeds vaker verwachten dat software direct antwoord geeft of dat data direct beschikbaar is. Zeker in het kader van ‘the internet of things’ zal een steeds grotere behoefte zijn aan een soort ‘Real-time Hadoop’.

Het is natuurlijk mogelijk om zo’n systeem zelf te bouwen. Een typische manier om real-time data te verwerken, is door een netwerk van queues en workers te bouwen. De workers luisteren naar de queues voor berichten, updaten de database en zenden nieuwe berichten naar de queues voor verdere verwerking. Deze manier van werken heeft echter wel een aantal beperkingen:

  • Arbeidsintensief; er zit veel tijd in het ontwikkelen van de juiste abstracties, het configureren van de queues en het deployen van de componenten.
  • Foutgevoelig; De applicatie is zelf verantwoordelijk voor het onderhouden van de workers en queues, het bijhouden van de berichten en moet er zelf voor zorgen dat alle berichten worden verwerkt, ook wanneer sprake is van een netwerkverstoring.
  • Schaalbaarheid; Wanneer veel berichten worden verzonden via de queues moeten de berichten gepartitioneerd worden. Het systeem zal dan tevens opnieuw geconfigureerd moeten worden

Storm is een framework dat in de behoefte van de industrie voorziet en ontwikkelaars ontlast van bovenstaande uitdagingen. Onder leiding van Nathan Marz heeft een team bij Backtype Storm ontwikkeld. Backtype is in 2011 overgenomen door Twitter, omdat het producten maakte om de impact en potentie van tweets inzichtelijker te maken. Twitter wilde deze producten aanbieden aan haar partners.

Storm heeft een aantal kernwaarden waar het aan voldoet dat menige toepassing zal aanspreken:

  • Breed inzetbaar; Storm kan gebruikt worden voor berichtverwerking, database-updates, queries doen op data streams en deze retourneren aan clients, complexe queries paralleliseren, etc.
  • Schaalbaar; Door de simpele abstracties van Storm is het mogelijk om gemakkelijk op en uit te schalen. Een voorbeeld: één van de eerste applicaties die gebruik maakte van Storm verwerkte 1.000.000 berichten per seconde op een cluster van 10 machines, inclusief honderden database-aanroepen per seconde.
  • Gegarandeerde dataverwerking; Een belangrijk onderdeel van een real-time systeem is dat deze de verwerking van data garandeert. Een systeem waarbij het mogelijk is dat data verloren gaat, heeft maar een beperkt aantal use-cases.
  • Onderhoudbaar; Storm maakt gebruik van Apache ZooKeeper. ZooKeeper is een open-source, gedistribueerde configuratie en synchronisatie service. Met ZooKeeper kan op een eenvoudige manier de configuratie van servers en services gedeeld worden door meerdere machines. Door gebruik te maken van ZooKeeper is het onderhouden van een Storm cluster gemakkelijk. Eén van de doelen die de makers van Storm zich hebben gesteld, is dat het product makkelijk te managen is. Het gebruik van ZooKeeper speelt hier dan ook een belangrijke rol in.
  • Fout-tolerant; Op het moment dat er een fout optreedt bij de verwerking van de data kan Storm ingrijpen. Data kan opnieuw verstuurd worden en  threads kunnen opnieuw gestart worden.
  • Programmeertaal-onafhankelijk; Hoewel Storm voornamelijk is gemaakt in Clojure, kunnen Storm clusters gestart worden door middel van allerlei programmeertalen.

Basics

Het Storm framework bestaat in de basis uit een aantal abstracties: bolts, spout en topologies vormen het hart van de abstractielaag die Storm biedt. Een spout is een component die data aanbiedt aan de applicatie. Bijvoorbeeld een spout die luistert naar berichten op een ESB, tweets van een twitterfeed leest of data van een websocket ontvangt. Storm zal de spout constant vragen om nieuwe data en wanneer de spout data heeft, wordt deze aan het platform gegeven. Het maken van een spout ziet er als volgt uit:


public class RandomWordFeeder extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private Random random;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector = collector;
        this.random = new Random();
    }
    @Override
    public void nextTuple() {
        String[] words = new String[]{
                "accompts", "active",  "altiloquent", "bicuspid",  "biweekly", "buffo", "chattels", "detached", "gaoler", "heeltap",  "milksop",
                "paralyzed", "passado", "reciminate", "repetend", "supertonic", "swashbuckler", "vaporarium", "wenching", "withers"
        };
        collector.emit(new Values(words[random.nextInt(words.length)]));
    }
}

Storm zal nextTuple() aanroepen om de volgende set met data te verwerken. In bovenstaand voorbeeld is het een woord dat willekeurig wordt gekozen uit een lijst van mogelijke woorden. Data in Storm wordt altijd verzonden in de vorm van Tuples. Tuples zijn lijsten van een vaste grootte die objecten bevatten in een bepaalde volgorde. Deze tuples worden vervolgens doorgestuurd naar de bolts. Bolts zijn componenten die data ontvangen, een verwerking doen en optioneel een tuple retourneren. De bedoeling is dat iedere bolt één enkele taak heeft. Bijvoorbeeld om te tellen hoe vaak een bepaald woord wordt genoemd in een twitterfeed. Een volgende bolt kan daarna bijhouden uit welk land de meeste tweets worden gedaan, etc.

Door iedere bolt een specifieke taak te geven wordt een hoge mate van parallellisatie bereikt, daarnaast wordt de herbruikbaarheid van een bolt verhoogd.


public class WordCounterBolt extends BaseBasicBolt {
    private Map counts = new HashMap();
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

Bovenstaand voorbeeld van een bolt ontvangt de tuples uit eerder genoemde spout in de execute() methode en houdt bij hoe vaak een woord voorbij komt. Vervolgens worden de totalen doorgegeven door collector.emit(…) aan te roepen.

Spouts en bolts worden aan elkaar geknoopt door middel van een topology. Een topology bepaalt de volgorde waarin data wordt verwerkt en bepaalt daarnaast hoeveel threads iedere component hoort te krijgen.


Een voorbeeld van een topology
 

 

Een topology wordt ook beschreven in Java, bijvoorbeeld:


TopologyBuilder builder = new TopologyBuilder();
 
builder.setSpout("wordGenerator", new RandomWordFeeder());
builder.setBolt("counter", new WordCounterBolt()).shuffleGrouping("wordGenerator");
builder.setBolt("jmsBolt", jmsBolt).shuffleGrouping("counter");
 
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());

In productie

Een Storm-cluster bestaat over het algemeen uit meerdere servers. De coördinatie tussen de servers wordt verzorgd door Apache ZooKeeper. Iedere machine in het cluster vervult een taak ten behoeve van het grotere geheel. Er is één machine in het geheel dat een topology interpreteert en ervoor zorgt dat het juiste aantal threads opgestart wordt en er tevens voor zorgt dat werk evenredig wordt verdeeld. Dit is de taak van de ‘Nimbus’ node. Alle andere machines krijgen werk toebedeeld van Nimbus en zorgen voor een correcte afhandeling van het werk. Deze nodes worden ‘Supervisors’ genoemd

 

Hoewel het lijkt alsof Nimbus een single-point-of-failure is, is dat niet het geval. Zowel Nimbus als de Supervisors zijn fail-fast en stateless. Dat betekent dat ieder proces zomaar afgesloten en opnieuw gestart kan worden. De daadwerkelijke state van het systeem is opgeslagen in ZooKeeper of op harde schijf.  Als een topology éénmaal aangeboden is aan Nimbus en deze heeft alle componenten opgestart, dan zal de topology oneindig blijven werken. Tenzij Nimbus natuurlijk de instructie krijgt om de topology te stoppen. Op deze wijze is ook het updaten van het verwerkingsproces te regelen. Het nieuwe algoritme kan gebouwd en getest worden. Wanneer deze klaar is, kan de nieuwe topology aangeboden worden aan Nimbus en kan de oude topology vervolgens worden stopgezet.

Use cases

In eerste instantie is Storm het meest geschikt voor applicaties die behoefte hebben om inkomende data direct te verwerken. Dat is anders dan de traditionele manier van dataverwerking, omdat de data niet eerst wordt opgeslagen in een database. In een real-time applicatie wordt de data direct verwerkt en worden resultaten opgeslagen; dat neemt niet weg dat de bron data  ook nog opgeslagen kan worden.

Bij NaviSite (http://www.navisite.com/) gebruiken ze Storm als onderdeel van hun logging / auditing systeem. Ze versturen logberichten van duizenden servers naar een RabbitMQ cluster. Hier gebruiken ze Storm om ieder bericht te controleren met een aantal reguliere expressies. Als er een match is (minder dan 1% van de gevallen), dan wordt het bericht naar een bolt gestuurd welke bolt de data opslaat in een MongoDB database. Zij hebben ongeveer 5.000 – 10.000 berichten per seconde te verwerken.

Dit is een perfect voorbeeld voor een techniek als Storm, omdat het gaat over veelal kleine berichten die elkaar in hoog tempo opvolgen. Bij het bedrijf 8digits gebruiken ze Storm weer op een andere manier. Zij bekijken het gedrag van klanten op websites en mobiele apps, om zo real-time actie te ondernemen op basis van kenmerken die de eigenaar van de mobiele app of website heeft aangegeven.

Conclusie

Storm neemt het merendeel van de complexiteit weg van real-time distributed applicaties. De ontwikkelaar hoeft niets te regelen om data te routeren naar bepaalde machines of het herverzenden van data als deze niet (helemaal) is verwerkt. Daarbij dwingt Storm de ontwikkelaar min of meer om data steeds in kleine porties te verwerken. Dit kan op zichzelf zorgen voor een betere oplossing, omdat individuele onderdelen van het oplossingsproces geoptimaliseerd worden.

Voor de hardcore softwareontwikkelaars is een techniek als Storm heel erg mooi. Je kan spannende zaken bouwen met Storm waarbij extreem goede prestaties te verwachten zijn. Wat eigenlijk het belangrijkste is bij ieder IT project, maar zeker in het geval van Storm, is dat de juiste oplossing voor het probleem gekozen wordt. Bijna ieder probleem is in principe op te lossen met Storm, maar het is belangrijk dat de kenmerken van het probleem in overeenstemming zijn met de kenmerken van Storm.

Streamer: Gebruikers verwachten steeds vaker dat software direct antwoord geeft of dat data direct beschikbaar is.

Streamer: Storm is een framework dat in de behoefte van de industrie voorziet en ontwikkelaars ontlast van bestaande uitdagingen.