Reading book: “Akka Essentials”

I have recently gotten myself a copy of “Akka Essentials” and started going through the code, but writing the same examples in Scala (instead of Java, which the original book uses). This is turning out to be a fairly good exercise, as it forces me to think what is the idiomatic way of writing something in Scala, versus just plainly dumping Java code and minimally editing it to make everything compile.

For example here’s a Java snippet from one of the first chapters, which sets things up for a “MapActor”:

package akka.first.app.mapreduce.actors;

import java.util.*;
import java.util.StringTokenizer;
import akka.actor.UntypedActor;
import akka.first.app.mapreduce.messages.MapData;
import akka.first.app.mapreduce.messages.WordCount;

public class MapActor extends UntypedActor {
  String[] STOP_WORDS = {
      "a", "am", "an", "and", "are", "as", "at",
      "be", "do", "go", "if", "in", "is", "it",
      "of", "on", "the", "to" };

  private List<String> STOP_WORDS_LIST =
      Arrays.asList(STOP_WORDS);

  @Override
  public void onReceive(Object message) throws Exception {
    if (message instanceof String) {
      String work = (String) message;
      // map the words in the sentence and send the result
      to MasterActor
          getSender().tell(evaluateExpression(work));
    } else
      unhandled(message);
  }

  private MapData evaluateExpression(String line) {
    List<WordCount> dataList = new ArrayList<WordCount>();
    StringTokenizer parser = new StringTokenizer(line);
    while (parser.hasMoreTokens()) {
      String word = parser.nextToken().toLowerCase();
      if (!STOP_WORDS_LIST.contains(word)) {
        dataList.add
            (new WordCount(word,Integer.valueOf(1)));
      }
    }
    return new MapData(dataList);
  }
}

This MapActor is part of a Map-Reduce-Aggregate set of Akka actors, and its work is relatively simple:

  • It should handle a “String” message, by tokenizing the string and returning a MapData result.
  • The MapData should contain a list of WordCount instances, with every word’s initial count set to 1.
  • The words which are part of a STOP_WORDS list should be skipped wen generating the result.

The code is readable in Java too, but as I was writing the equivalent in Scala, I noticed that I could do a couple of things to simplify it even more. One of them was to use pattern matching instead of instanceOf checks in the message-handling code. The other one was that a while loop is a bit noisy in this case, and it can be replaced by several more concise snippets.

Another happy side-effect of writing in Scala was that the code was stripped of all the ‘boilerplate’ Java requires. For example, I didn’t have to do all the crafty stuff about ArrayList and manual conversion to List<String>, since Scala provides a neat and very readable way of initializing a list of strings.

I started with something like this:

package akka.first.app.mapreduce.actors

import akka.first.app.mapreduce.messages.MapData
import akka.actor.UntypedActor

class MapActor extends UntypedActor {
  override def onReceive(message: Any) =
    message match {
      case (work: String) => getSender ! evaluateExpression(work)
      case _              => unhandled(message)
    }

  /** Defines a list of words which are never counted. */
  private val STOP_WORDS: List[String] = List(
    "a", "am", "an", "and", "are", "as", "at", "be","do",
    "go", "if", "in", "is", "it", "of", "on", "the", "to")

  /** Evaluates a sentence, removes non-stop words, and returns
    * the result as a new `MapData` instance.
    */
  private def evaluateExpression(text: String): MapData =
    new MapData(Nil)
}

This doesn’t do anything useful in evaluateExpression yet. It’s already almost there though, and even if we include the Scaladoc comment lines, the total is about half the size of the Java code.

Now, I experimented with two alternative approaches of tokenizing the input text. One of them uses the simplest code possible:

  private def evaluateExpression(text: String): MapData =
    new MapData(text.split("\\s").map{_.toLowerCase}
      .filter(w => !STOP_WORDS.contains(w))
      .map(w => new WordCount(w, 1)))

This version should work fine, and the Scala version of the MapActor is still very readable code:

package akka.first.app.mapreduce.actors

import akka.first.app.mapreduce.messages.MapData
import akka.actor.UntypedActor

class MapActor extends UntypedActor {
  override def onReceive(message: Any) =
    message match {
      case (work: String) => getSender ! evaluateExpression(work)
      case _              => unhandled(message)
    }

  /** Defines a list of words which are never counted. */
  private val STOP_WORDS: List[String] = List(
    "a", "am", "an", "and", "are", "as", "at", "be","do",
    "go", "if", "in", "is", "it", "of", "on", "the", "to")

  /** Evaluates a sentence, removes non-stop words, and returns
    * the result as a new `MapData` instance.
    */
  private def evaluateExpression(text: String): MapData =
    new MapData(text.split("\\s").map{_.toLowerCase}
      .filter(w => !STOP_WORDS.contains(w))
      .map(w => new WordCount(w, 1)))
}

Another version of evaluateExpression, which I wrote to play around with Scala and Stream generating functions was the following:

import java.util.StringTokenizer

def tokens(parser: StringTokenizer): Stream[String] =
  parser.hasMoreTokens match {
    case false => Stream.empty
    case true  => Stream.cons(
      parser.nextToken.toLowerCase, tokens(parser))
  }

def evaluateExpression(text: String): MapData = {
  val parser = new StringTokenizer(text)
  val words = tokens(parser)
  MapData(words.filter(w => !STOP_WORDS.contains(w))
          .map(w => WordCount(w, 1)).toList)
}

One important difference of this version is that it doesn’t pre-generate the entire list of words in memory, so it might be a better option for extremely large input strings. It is already showing a difference for e.g. this code:

scala> import java.util.StringTokenizer
import java.util.StringTokenizer

scala> def tokens(parser: StringTokenizer): Stream[String] =
     |     parser.hasMoreTokens match {
     |       case false => Stream.empty
     |       case true  => Stream.cons(parser.nextToken.toLowerCase, tokens(parser))
     |     }
tokens: (parser: java.util.StringTokenizer)Stream[String]

scala> def time[R](block: => R): R = {
     |     val t0 = System.nanoTime()
     |     val result = block    // call-by-name
     |     val t1 = System.nanoTime()
     |     println("Elapsed time: " + (t1 - t0) + "ns")
     |     result
     | }
time: [R](block: => R)R

scala> val large = "foo " * 500000
large: String = "foo foo foo foo foo foo foo foo foo ...

scala> time { tokens(new StringTokenizer(large)).size }
Elapsed time: 92060000ns
res1: Int = 500000

scala> time { large.split("\\s").size }
Elapsed time: 126982000ns
res2: Int = 500000

Note how the second version, which uses plain split(), is already 37.93% slower, even when we are just counting the number of words in each token list [126982000.0 / 92060000 =~ 1.37934].

What I got from doing this rewrite in Scala though was something entirely different. An appreciation for the features of Scala which let me write compact, concise, yet also very readable, and very clean code like this final version of the MapActor code:

package akka.first.app.mapreduce.actors

import akka.first.app.mapreduce.messages.MapData
import akka.first.app.mapreduce.messages.WordCount
import akka.actor.UntypedActor

/** Parses a `String` into whitespace-separated words, skips
  * over "stop words" and constructs a MapData message with
  * the resulting words and their counts initialized to 1.
  */
class MapActor extends UntypedActor {
  override def onReceive(message: Any) =
    message match {
      case (work: String) => getSender ! evaluateExpression(work)
      case _              => unhandled(message)
    }

  /** Defines a list of words which are never counted. */
  private val STOP_WORDS: List[String] = List(
    "a", "am", "an", "and", "are", "as", "at", "be","do",
    "go", "if", "in", "is", "it", "of", "on", "the", "to")

  /** Evaluates a sentence, removes non-stop words, and returns
    * the result as a new `MapData` instance.
    */
  private def evaluateExpression(text: String): MapData =
    MapData(text.split("\\s")
      .filter(w => !STOP_WORDS.contains(w))
      .map(w => WordCount(w, 1)).toList)
}