Large scala analytics aggregations with Monoids in Scala

Petr Janda
5 min readOct 30, 2015

--

At GlobalWebIndex we work on several challenging problems, one of which is large scale analytics system behind our new product. From the data processing point of view its very similar to other ad-tech systems. We have an impression log, which gets a new line each time our javascript tag is trigger on any of the campaigns / websites we are tracking. Each line contains cookie id with handful of other attributes.

Aside of more complex analytics requirements, we want to be able to calculate several aggregations — total number of impressions, total unique cookies or average number of impressions per cookie. Even though these are very basic, they are quite expensive to calculate on the scale of hundreds of millions or billions.

For the further elaboration I will use dataset of 39M impressions in the log with about 10M unique UUIDs which in total is about 1.4GB dataset.

Building the aggregations

Lets start with a simple aggregation — number of total impressions — which will help us understand few common aggregation attributes we are looking for . As my target is to build a general solution which can scale to billions I have to make sure my calculations can be made incremental. This means that I should be able to calculate total impressions by day, or by campaign or by any other reasonable partitioning.

It turns out that we can find inspiration in abstract algebra. Its again a mythical Monoid. In order to make my aggregation a Monoid I have to satisfy both axioms: Presence of identity element and Associativity. As it turns out it will work out nicely:

  1. I am able to initialise empty aggregation — total number of elements of empty impression log is 0
  2. I am able to calculate 2 subtotals and then combine them to calculate the final value — this is associative, so the order doesn’t matter

This actually gets even more interesting because the very same Monoid abstraction is valid for whole range of other useful aggregations as we will show later.

Ok lets start some code. I will define a trait Aggregateable:

trait Aggregateable[I, O, S] {
def init: S
def lift(in: I): S def combine(left: S, right: S): S def unwrap(result: S): O
}

It has I, O, S types, which represent our aggregation Input, Output and Storage. New Storage can be created with init, Input can be lifted to Storage with lift(in: I): S, two Storages can be combined with combine(left: S, right: S): S and finally we can get the Output value from Storage by calling unwrap(result: S): O.

This trait defines an interface, which can be implemented by various aggregations which can be later use by the common processing built around them.

Back to our total impressions we can define a Counter:

object Counter extends Aggregateable[Seq[String], Long, Long] {
def init = 0L
def lift(in: Seq[String]): Long = in.size.toLong def combine(left: Long, right: Long): Long = left + right def unwrap(result: Long): Long = result
}

which is quite straightforward.

I will also create a class Aggregator which will take Agregateable and provide several convenient methods on top of it:

class Aggregator[I, O, S]
(agg: Aggregateable[I, O, S], in: Option[S] = None) {
private val underlying: S = in.getOrElse(agg.init)
def add(in: I): Aggregator[I, O, S] =
combine(underlying, agg.lift(in))
def merge(other: this.type) =
combine(underlying, other.underlying)
def result: O =
agg.unwrap(underlying)
private def combine(left: S, right: S): Aggregator[I, O, S] =
new Aggregator(agg, Some(agg.combine(left, right)))
}

and use it as part of my Akka Stream application (but you can use it pretty much everywhere where you can fold over the sequence) to calculate total of UUIDs I will be loading from a file:

val res = Source(() => file.getLines)
.grouped(10000)
.runFold(new Aggregator(Counter)) { _ add _ }

Here I am iterating over lines of a file, grouping them by 10k and running them through our Aggregator which now works as a Counter. The result is 39191737, which is exact count of lines in the file. No surprise.

Lets try to calculate number of unique elements in the list. Here is our Aggregation:

object Unique extends Aggregateable[Seq[String], Long, Set[String]] {
def init = Set.empty[String]

def lift(in: Seq[String]): Set[String] = in.toSet

def combine(left: Set[String], right: Set[String]): Set[String] =
left ++ right

def unwrap(result: Set[String]): Long = result.size
}

using it as follows:

val res = Source(() => file.getLines)
.grouped(10000)
.runFold(new Aggregator(Unique)) { _ add _ }

I can calculate unique count of cookies in my dataset — 9801125 in about 79.8s. Of course the above is rather naive way to go about it as the resource consumption is enormous (we will tackle this problem in the next post).

Powerful combinations

Lets go further, I might want to know what is an average of number of appearances per UUID in our set? We can calculate the average = total /distinct thus I can implement average on top of Counter and Unique:

case class Avg(unique: Set[String], total:Long)object Average extends Aggregateable[Seq[String], Double, Avg] {
def init: Avg = Avg(Unique.init, Counter.init)

def combine(left: Avg, right: Avg): Avg =
Avg(Unique.combine(left.unique, right.unique), Counter.combine(left.total, right.total))

def lift(in: Seq[String]): Avg =
Avg(Unique.lift(in), Counter.lift(in))

def unwrap(result: Avg): Double =
result.unique.size.toDouble / result.total.toDouble
}

Here I’ve created yet another Aggregateable which we can again use very easily in our Akka Stream:

val res = Source(() => file.getLines)
.grouped(10000)
.runFold(new Aggregator(Average)) { _ add _ }

Nice! Soon you will find out how rich Aggregateable abstraction really is. I can for instance go ahead and implement its version reporting on pair of aggregations. Lets have a look at it:

class Pair[I, O, O2, S, S2]
(first: Aggregateable[I, O, S], second: Aggregateable[I, O2, S2])
extends Aggregateable[I, (O, O2), (S, S2)] {
override def init: (S, S2) =
(first.init, second.init)
override def lift(in: I): (S, S2) =
(first.lift(in), second.lift(in))
override def combine(left: (S, S2), right: (S, S2)): (S, S2) =
(first.combine(left._1, right._1), second.combine(left._2, right._2))
override def unwrap(result: (S, S2)): (O, O2) =
(first.unwrap(result._1), second.unwrap(result._2))
}

Voila, now instead of Average I can get both Total and Distinct values from the stream with very little code changes.

val res = Source(() => file.getLines)
.grouped(10000)
.runFold(new Aggregator(new Pair(Counter, HLL)) { _ add _ }

will yield a pair of 2 Longs — (39191737,9810035) which is the total and distinct count. Now, given I can do a pair, there is nothing preventing me to do pair of pairs or countess other aggregations which can be designed around the same abstraction. Handy, isn’t it?

Conclusions

I hope this was a nice exercise demonstrating real world application of power of Monoids. It is a very powerful abstraction which will allow me to calculate lots of other interesting aggregations — min value, max value, top N most occurring elements, etc. There is also nothing dictating I have to work with sequences of Strings, I am able to define various combinations of input / output / storage and all the surrounding code will hold which makes system very flexible for future requirements.

In the next post I will show another version of cardinality aggregation — this time using HyperLogLog approximations and will demonstrate how we can calculate number of unique elements in the set in much more efficient way, which can easily scale to 2⁶⁴ values with KBs of memory.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Petr Janda
Petr Janda

Written by Petr Janda

Sharing experiences from scaling multiple startups over the past decade. Speaking about products, technology, go-to-market, or culture.

No responses yet

Write a response