Mood Bot – a Serverless Slack Integration

Pull me on GitHub!

Mood Bot

So it’s been a tradition in my office to use Slack to gauge the team’s mood once a week. Previously our PM would post a message asking for feedback and people would add a reaction to show how they were feeling. This worked fine, though there were a couple of issues: firstly it was pretty hard to interpret the weird collection of party parrots and doges, and secondly people tend to follow the herd when they can see how others have reacted.

Here’s my idea for the new, automated workflow…

Screenshot 2017-05-03 12.20.48

By far the cheapest and maybe the simplest way to host all of the code to do this is to “go serverless” using many of the cool features available on AWS to host code, databases and APIs on a pay-per-use basis.  Here’t the technical architecture…

MoodBot

Based on the above there are three broad areas for development: send the webhook to Slack; deal with responses when users click the buttons and serve up a chart showing the results for the week.

Sending the Web Hook

Slack allows you to post Interactive Messages using an Incoming Webhook. In order to do this you’ll need to add a new slack bot integration using their very friendly web UI. I called mine “MoodBot”. Once you have a bot set up, you need to enable “Incoming Webhooks” and add the target URL to an environment variable (see here or here for more details).

The format of the message you send needs to be something like the following.  Note that the “interactive” part of the message is included as an attachment.

const message = {
  "text": ":thermometer: @channel *Time for a Team Temp Check!* @channel :thermometer: \n _Click as many times as you like, only your last vote will be counted._",
  "channel": "@laurence.hubbard",
  "attachments": [
    {
      "text": "How are you feeling this week?",
      "fallback": "I am unable to understand your feelings. Too deep maybe?",
      "callback_id": "mood_survey",
      "color": "#3AA3E3",
      "actions": [
        {
          "name": "mood",
          "text": "Good :+1:",
          "type": "button",
          "value": "good"
        },
        {
          "name": "mood",
          "text": "Meh :neutral_face:",
          "type": "button",
          "value": "meh"
        },
        {
          "name": "mood",
          "text": "Bad :-1:",
          "type": "button",
          "value": "bad"
        },
        {
          "name": "mood",
          "text": "Terrible :rage:",
          "type": "button",
          "value": "terrible"
        },
        {
          "name": "mood",
          "text": "AWESOME!!!   :doge:",
          "type": "button",
          "value": "awesome"
        }
      ]
    }
  ]
}

This gives you a slack message looking like this:

The webhook is sent by a Lambda function, which is triggered crontab-style by a CloudWatch event rule.  The Lambda looks like this:

const AWS = require('aws-sdk');
const url = require('url');
const https = require('https');

const kmsEncryptedHookUrl = process.env.kmsEncryptedHookUrl;
let hookUrl;

function postMessage(inputData, callback) {
    const body = JSON.stringify(message);
    const options = url.parse(hookUrl);
    options.method = 'POST';
    options.headers = {
        'Content-Type': 'application/json',
        'Content-Length': Buffer.byteLength(body),
    };

    const postReq = https.request(options, (res) => {
        const chunks = [];
        res.setEncoding('utf8');
        res.on('data', (chunk) => chunks.push(chunk));
        res.on('end', () => {
            if (callback) {
                callback({
                    body: chunks.join(''),
                    statusCode: res.statusCode,
                    statusMessage: res.statusMessage,
                });
            }
        });
        return res;
    });

    postReq.write(body);
    postReq.end();
}

function processEvent(slackMessage, callback) {
    slackMessage.channel = slackChannel;
    
    postMessage(slackMessage, (response) => {
        if (response.statusCode < 400) {
            console.info('Message posted successfully');
            callback(null);
        } else if (response.statusCode < 500) { console.error(`Error posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`); callback(null); // Don't retry because the error is due to a problem with the request } else { // Let Lambda retry callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); } }); } exports.handler = (event, context, callback) => {
    console.log("Sending a temp check request")
    
    if (hookUrl) {
        // Container reuse, simply process with the key in memory
        processEvent(event, callback);
    } else if (kmsEncryptedHookUrl && kmsEncryptedHookUrl !== '<kmsEncryptedHookUrl>') {
        const encryptedBuf = new Buffer(kmsEncryptedHookUrl, 'base64');
        const cipherText = { CiphertextBlob: encryptedBuf };

        const kms = new AWS.KMS();
        kms.decrypt(cipherText, (err, data) => {
            if (err) {
                console.log('Decrypt error:', err);
                return callback(err);
            }
            hookUrl = `https://${data.Plaintext.toString('ascii')}`;
            processEvent(event, callback);
        });
    } else {
        callback('Hook URL has not been set.');
    }
};

Setting up the rule to trigger the event is pretty simple. Log into the AWS console, select CloudWatch and choose Events -> Rules from the menu on the left. You can specify when the rule will run using a crontab line.  I used…

0 09 ? * WED *

Which will run at 9am (GMT) every Wednesday.  All this is set up via a reasonably clunky web interface!

Collating Responses

This is the most complicated bit (and there’s an extra tricky bit to deal with too). To handle the responses when users click buttons on the interactive Slack message you need four things: 1. A lambda function to handle the POST request and push data to a database, 2. an API Gateway resource to provide an HTTP end-point, translate the request and forward it to the Lambda function, 3. a database to store the data and finally 4. a config setting in Slack to tell it where to send the POST.

Here’s the code for my Lambda function. It’s simple enough – it just takes the JSON in the incoming request, grabs the bits it wants and adds a few dates and times to create another JSON object to post to DynamoDB. The response sent back to slack is a replacement message, which will overwrite the one already in the channel. Here I add a list of users who have clicked so far (a better man would have pulled this list from the DB!).

var AWS = require('aws-sdk');

var dynamo = new AWS.DynamoDB.DocumentClient();
const table = "MoodResponses";

function updateVoters(original, voter) {
    var updated = original;
    
    var msg = "\nVoted so far: ";
    var comma = true;
    if(!updated.includes(msg)) {
        updated = updated + msg;
        comma = false;
    }
    
    if(!updated.includes(voter)) {
        if(comma) {
            updated = updated + ", ";
        }
        
        updated = updated + "<@" + voter + ">";
    }

    return updated;
}

Date.prototype.getWeek = function() {
        var onejan = new Date(this.getFullYear(), 0, 1);
        return Math.ceil((((this - onejan) / 86400000) + onejan.getDay() + 1) / 7);
    }

exports.handler = function(event, context, callback) {
    
    console.log('Received Slack Message: ', JSON.stringify(event, null, 2));
    
    var mood = event.actions[0].value;
    var date = new Date(Number(event.message_ts) * 1000);
    var key = event.user.id + "@" + date.getFullYear() + "-" + date.getWeek();
    var record = {
        TableName: table,
        Item: {
            key: key,
            message_ts: Number(event.message_ts),
            username: event.user.name,
            user_id: event.user.id,
            mood: mood,
            date_string: date.toISOString(),
            day: date.getDate(),
            month: (date.getMonth() + 1),
            week: date.getWeek(),
            year: date.getFullYear()
        }
    };
    
    console.log("Created mood record: " + JSON.stringify(record, null, 2));

    dynamo.put(record, function(err, data) {
        if (err) {
            console.error("Unable to add item. Error JSON:", JSON.stringify(err, null, 2));
                
            callback(null, {
                  text: "An error occurred inserting to DynamoDB. Error attached.",
                  attachments: [{text: JSON.stringify(err, null, 2)}],
                  replace_original: false
                });
        } else {
            console.log("Added item:", JSON.stringify(record, null, 2));
            
            callback(null, {
                  text: updateVoters(event.original_message.text, event.user.id),
                  attachments: event.original_message.attachments,
                  replace_original: true
                });
        }
    });
};

Setting up the API Gateway (The Extra Tricky Bit)

Setting up the API Gateway should be simple enough – you add a new API then a new resource then a new POST method. Then configure the method to forward requests to the Lambda function you just created. However, there are a couple of issues.

Firstly, you need to enable cross site access (CORS) which is easy enough – you just select “Enable CORS” from the “Actions” dropdown. This will open your method up to calls from other sites.

The second and far more infuriating issue is that Slack’s Interactive Buttons send the data in a funky way, encoding it weirdly in the message body rather than just posting JSON as all the other calls do.  After a couple of days of intermittent head-scratching I finally found this Gist, which contains the code to fix the problem:

This code needs to be placed into a Body Mapping Template for your POST method within the AWS API Gateway UI. The following screenshot hopefully give you enough of a clue on how to set this up.  Now, when Slack sends the malformed (IMHO) POST, the API gateway will reformat it and pass it through to your lambda function as if it were a normal JSON payload.

Screenshot 2017-04-24 18.45.47

Database Setup

I decided to use DynamoDB – Amazon’s “Document Database as a Service” (DDaaS?). I’m not sure it’s the perfect choice for this work, since querying is pretty limited, but it is very cheap and incredibly simple to use.

For this step, just use the web UI to create a new table called “MoodResponses”. I used an “id” field as the index.  The lambda creates “id” by concatenating the user ID and current week. This means you automatically limit each user to a single vote per week, which is exactly the functionality I was looking for – more or less for free!

Slack Request URL

Final step is very simple – use the Slack admin UI for your bot to add the address of your API resource as the target for interactive message callbacks.  Go to the admin page and select Features -> Interactive Messages from the panel on the left and paste in the URL of your API Gateway method.

Displaying Results

Though there are more boxes on the diagram below, this is actually the easiest step by far. We serve up a simple D3js “single page app” direct from S3 as static content. This SPA page calls a GET method on the REST service we created above which in turn calls a Lambda function. The Lambda hits out database, pulls out the results and sends them back as a JSON payload.

There’s not much more to explain, so I’ll just link to a Fiddle which includes the code for my front end – this one actually hits my production database, so you’ll be able to see how my team feel!

Serving this code up as a static HTML file is very easy: Create an index.html document and add the javascript, HTML and CSS from the fiddle; create a new S3 bucket and, in the properties for the bucket, enable “Static Website Hosting”; upload your index.html file to the bucket, select it and select “Make Public” from the “Actions” dropdown.

Here’s the code for the Lambda function which is servicing the GET request:

var AWS = require("aws-sdk");

var docClient = new AWS.DynamoDB.DocumentClient();

Array.prototype.countBy = function(key) {
  return this.reduce(function(rv, x) {
    rv[x[key]] = (rv[x[key]] || 0) + 1;
    return rv;
  }, {});
};

Date.prototype.getWeek = function() {
    var onejan = new Date(this.getFullYear(), 0, 1);
    return Math.ceil((((this - onejan) / 86400000) + onejan.getDay() + 1) / 7);
};

Date.prototype.previousWeek = function() {
    return new Date(this.getTime() - (7 * 24 * 60 * 60 * 1000));
};

function forWeek(week) {
    return {
        TableName : "MoodResponses",
        IndexName: 'week-user_id-index',
        KeyConditionExpression: "#wk = :week",
        ExpressionAttributeNames:{
            "#wk": "week"
        },
        ExpressionAttributeValues: {
            ":week":week
        }
    };
}

function handleError(err, callback) {
    console.error("Unable to query. Error:", JSON.stringify(err, null, 2));
    callback(null, {"error": JSON.stringify(err, null, 2)});
}

function handleData(data, callback, week) {
    console.log("Query succeeded.");
            
    var results = {
        week: week,
        moods: data.Items.countBy("mood")
    };

    callback(null, results);
}

function runFor(date, tries, callback) {
    var week = date.getWeek();    
    console.log('Fetching mood results for week: ' + week);
    
    docClient.query(forWeek(week), function(err, data) {
        if (err) {
            handleError(err, callback);
        } else if(data.Items.length > 0 || tries <= 0) {
            handleData(data, callback, week);
        } else {
            runFor(date.previousWeek(), tries - 1, callback);
        }
    });
}

exports.handler = function(event, context, callback) {
    runFor(new Date(), 1, callback);
};

One Last Thing!

Dynamo can only query against fields which are part of an index. Here we need to query by week number, so I added a new index to my Dynamo table by week. This took 5 minutes to update (even though the table only had 5 records in it at the time!) but was simple enough to do.  If you look at the code above, you can see where I specify the index in the query parameters.

Wrap Up

So this all works really well.  There’s lots left to do: making the results look prettier and looking at how the sourcecode is deployed and managed being two things at the top of my list.

Slack is the industry standard tool for team collaboration these days, and bots and integrations amp up the power and productivity at your disposal. Build status, Jira tickets, team morale, coffee orders and whatever else you fancy can all be brought together with conversational APIs, simplifying just about everything.

On the AWS side, there’s still a lot of googling required to build this sort of thing, and sometimes information is scarce. Those who enjoy building “proper applications” using IDEs like IntelliJ or Visual Studio are going to find this frustrating – the pieces feel disjoint and uncontrolled sometimes.  However, all in all it’s pretty cool what you can do without a single server in the mix.

It’s hard to deny that this development model is going to be the de-facto standard within the next couple of years, as it’s just so damned quick and simple. So get out and get serverless!

Amazon Athena – First Look

Amazon recently launched Athena – their answer to Google’s Big Query. It’s basically an SQL interpreter which runs over files in S3.  It reminds me of Apache Drill, but people round the office say it looks more like Hive.

AWS Athena is in no way associated with the ancient goddess of wisdom. Any similarity is purely coincidental.

The barrier to entry is very low. Upload the data files (CSV, Parquet and JSON are supported, amongst others), define a table, run a query. All this is done using a simple query editor.

Quick “Hello World”

To test Athena I uploaded some Parquet files, containing data from the open house price dataset to an S3 bucket (I had wanted to load the CSV files “as is” but due to limitations in the CSV reader I couldn’t). I then declared a table like so:

CREATE EXTERNAL TABLE IF NOT EXISTS house_prices.price_paid (
  `id` string,
  `price` int,
  `date` string,
  `postcode` string,
  `property_type` string,
  `old_or_new` string,
  `tenure_duration` string,
  `address1` string,
  `address2` string,
  `street` string,
  `locality` string,
  `town` string,
  `district` string,
  `county` string,
  `ppd_category` string,
  `record_status` string,
  `month` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://logicalgenetics.data/price-paid/'

And a few seconds later we’re ready to go:

select town, avg(price) as price 
from house_prices.price_paid 
group by town 
order by price desc
1	GATWICK	2683329.6666666665
2	THORNHILL	985000.0
3	VIRGINIA WATER	741140.2347652348
4	CHALFONT ST GILES	731333.515394913
5	COBHAM	610556.8430019713
6	BEACONSFIELD	587652.6552173913
7	KESTON	584417.7181571815
8	ESHER	551595.5002180074
9	GERRARDS CROSS	513740.5765843979
10	ASCOT	461468.9531164819

Good Stuff

The ease of setup in simple cases makes this technology very lightweight. If you already have data in S3, you can just start using Athena straight away. It’s perfect for ad-hoc querying, sanity checking and QA/test activities.

Athena uses a “server less” model – you pay for the rows you scan – no need to set up a cluster etc. At the time of writing, it’s something like $5 per 1TB of data scanned. As with everything on AWS, this is bearable but not exactly cheap.

Not Good Stuff

At the time of writing, Athena is very new. There are many missing features at the moment, which I hope Amazon will be adding in future.

Firstly, CSV read is limited to pure comma-separated data. Quotes are not supported. This is painfully annoying, as almost all CSV data has quotes around string fields. If you have to transform existing CSV data to remove quotes, the cost is going to outweigh any benefit you might have got from doing the direct queries.

The other annoyance to me is the lack of options for saving data back to S3. select into and create as select style statements are not (yet) supported. This breaks a key use-case for me: the ability to do one-off transforms of legacy or 3rd party data to new file formats. Wouldn’t it be nice to take a CSV file, uploaded by a 3rd party, change a few field names, transform to parquet (or JSON or whatever) and save back into your data warehouse? Yes it would. But you can’t. Sorry.

Conclusion

Athena is pretty good if you want a simple tool for doing basic ad-hoc querying over data stored in S3 – provided that data is in a compatible format.

Sadly though, Athena is just not ready for the big time, as yet. With the addition of support for more data formats and the ability to save data back to S3, it could be an incredibly useful tool, but right now I could count the number of use-cases on one hand.

One to watch!

Evolutionary Algorithm: Playable Demo

Here I’m combining a bit of visualisation with my other favourite subject – the Evolutionary Algorithm (or Genetic Algorithm if you prefer).  I’m not going to write anything about the properties of the algorithm – you can just play with the controls below the chart and see how the different settings effect its ability to find a good solution, adapt to changes and explore the problem space.

The problem: Find a value of x which maximises the value of y. The function is a set of sinusoidal waves of varying frequency and amplitude. The blue line shows the “fitness” for each value of x.

Basically, a population of different solutions is maintained – in this case, each solution is simply a value for x. Every individual has a fitness which can be calculated based on it’s value. Each iteration (100ms here) a solution is removed from the population – killed by selective pressure. Fitter individuals have a greater chance at surviving, less fit individuals have a less of a chance.

A replacement solution is “bred” each iteration, to replace the solution killed-off by selective pressure. This new individual is generated by combining the “genetic material” of one or more parents. In this case, just by taking the x value of a single parent. Importantly, a mutation is applied to the new solution – this is key to exploring the problem space effectively.

And that’s all there is to an Evolutionary Algorithm – it’s just a way of finding the right combination of input variables to maximise some arbitrarily complex fitness function. It does this through a guided random search.

screenshot-2016-11-11-17-25-56

Conway’s Game of Life

For some reason, I have started playing with D3.js a lot. Not sure why – maybe it’s just because I managed to get it integrated with WordPress. Anyway, I recently knocked up this version of Conway’s Game of Life. Most of the clever stuff is stolen from this snippet on bl.ocks.org but I made a few subtle changes to the code and the visual style.

I don’t take credit for much here – I just wanted to record that I’d spent a couple of train journeys building something cool!

screenshot-2016-11-09-18-39-39

Well travelled or just plain old?

A friend of mine has always said that young cars with high mileage are better than old cars with low mileage. The theory being that company cars, which have spent their time cruising on the motorways, have had a much easier life than their stay-at-home cousins who’ve done short hops around town and sat on their driveways seizing up.

So I pointed some very simple Spark queries at the UK government’s open MOT data to see what I could find (you can read about the last time I did this here). First factoid to note is that both mileage and age are relevant when it comes to predicting pass rates. The following two charts show pass rate vs mileage and age.

Pass Rate vs Mileage

Pass Rate vs Age

To look at all three variables together I created the following chart which shows shows age on on the x axis and mileage on the y. Pass rate is a colour scale with red being the worst and green the best. Green squares show combinations of mileage and age at which vehicles are more likely to pass their MOT on the first attempt. Red squares show combinations where a first-try failure is likely.

Pass Rate vs Mileage and Age

There is some truth to my mate’s theory – at least if this chart is to be believed – the pass rate for 3-5 year old cars looks pretty good even at very high mileages. Looking horizontally for very-low-mileage cars of increasing age there seems to be something quite odd going on for vehicles on less than 20k miles. For the 20k-40k range there does seem to be a green stripe across the ages, but it is not as apparent as it’s vertical counterpart.

So should we all be buying a four-year-old car with 180k miles on the clock? Well, no. At least not if we want to keep it for more than a year or two. Cars with high mileages on the clock go into the red much earlier than those with low mileage (based on the fact that vehicles can only move right and up through the chart as they get older and drive further).

Pass Rate vs Mileage and Age… to the MAX

That last chart shows the same heat-matrix view, but to the full extents of the data. There are some interesting facts hidden in that chart… but I’ll leave them as an exercise for the reader!144

UPDATE: Proper Stats:

So it turns out that calculating correlation and covariance with Spark is pretty easy. Here’s the results and the code:

For cars < 20 years and < 250,000 miles
cov(testMileage, pass) = -3615.011
corr(testMileage, pass) = -0.195
cov(age, pass) = -0.401
corr(age, pass) = -0.235
For all data
cov(testMileage, pass) = -3680.0456
corr(testMileage, pass) = -0.177
cov(age, pass) = -0.383
corr(age, pass) = -0.152

Looking at cars in the “normal” range (i.e. less than 20 years old and less than 250k miles) there’s a stronger correlation between age and pass rate than between mileage and pass rate. Interestingly, looking over the full range of the data this relationship is inverted, with mileage being very slightly better.  There’s little to separate the two as a predictor for pass or fail – not least because age and mileage are largely dependant on each other (with a correlation of 0.277 across all data).

Basic statistical functions are available under DataFrame.stat. See the calls hidden in the println lines below:

  it should "calculate covariance and correlation for normal cars" in {
    val motTests = Spark.sqlContext.read.parquet(parquetData).toDF()
    motTests.registerTempTable("mot_tests")

    val df = motTests
      .filter("testClass like '4%'") // Cars, not buses, bikes etc
      .filter("testType = 'N'") // only interested in the first test
      .filter("age &amp;lt;= 20")
      .filter("testMileage &amp;lt;= 250000")
      .withColumn("pass", passCodeToInt(col("testResult")))

    println("For cars &amp;lt; 20 years and &amp;lt; 250,000 miles")
    println(s"cov(testMileage, pass) = ${df.stat.cov("testMileage", "pass")}")
    println(s"corr(testMileage, pass) = ${df.stat.corr("testMileage", "pass")}")

    println(s"cov(age, pass) = ${df.stat.cov("age", "pass")}")
    println(s"corr(age, pass) = ${df.stat.corr("age", "pass")}")
  }

Behaviour Driven Spark

Spark is a big deal these days, people are using this for all sorts of exciting data wrangling. There’s a huge trend for ease of use within the Spark community and with tools like Apache Zeppelin coming onto the scene the barrier to entry is very low. This is all good stuff: open source projects live and die in the first half an hour of use. New users need to get something cool working quickly or they’ll get bored and wander off…

But for those of us who got past Hello World some time ago and are now using Spark as the basis of large and important projects there’s also the chance to do things right. In fact, since Spark is based on a proper language (Scala, not R or python please!) it’s a great chance to bring some well established best practices into a world where uncontrolled script hackers have held sway for too long!

Check out the source for this article on my GitHub: https://github.com/DanteLore/bdd-spark

cucumber

Behaviour Driven Development, or BDD, is a bit like unit testing. Like unit testing done by an experienced master craftsman. On the surface they look the same – you write some “test” code which calls your production code with known inputs and checks the outputs are what you want them to be. It can be run from your IDE and automated in your CI build because it uses the same runner as your unit tests under the hood.

For me, TDD and BDD differ in these two critical ways: BDD tests at the right level; because you’re writing “Specifications” in pseudo-English not “Tests” in code you feel less inclined to test every function of every class. You test at the external touch-points of your app (load this data, write to this table, show this on the UI), which makes your tests less brittle and more business oriented. Which leads to the second difference: BDD specs are written in Cucumber, a language easily accessible to less techie folks like testers, product owners and stakeholders. Because Cucumber expresses business concepts in near-natural language, even your Sales team have a fighting chance of understanding it… well, maybe.

Project Setup

Before we can crack on and write some Cucumber, there is some setup to be done in the project. I am using IntelliJ, but these steps should work for command line SBT also.

First job, get build.sbt set up for Spark and BDD:

name := "spark-bdd-example"

version := "1.0"
scalaVersion := "2.10.6"

libraryDependencies ++= Seq(
  "log4j" % "log4j" % "1.2.14",
  "org.apache.spark" %% "spark-core" % "1.6.0",
  "org.apache.spark" %% "spark-sql" % "1.6.0",
  "org.apache.spark" %% "spark-mllib" % "1.6.0",
  "org.json4s" %% "json4s-jackson" % "3.2.7",
  "info.cukes" % "cucumber-core" % "1.2.4" % "test",
  "info.cukes" %% "cucumber-scala" % "1.2.4" % "test",
  "info.cukes" % "cucumber-jvm" % "1.2.4" % "test",
  "info.cukes" % "cucumber-junit" % "1.2.4" % "test",
  "junit" % "junit" % "4.12" % "test",
  "org.scalatest" %% "scalatest" % "2.2.4" % "test"
)

For this example I am wrapping Spark up in an object to make it globally available and save me mocking it out “properly”. In a production app, where you need tighter control of the options you pass to spark, you might want to mock it out and write a “Given” to spin Spark up. Here’s my simple object in Spark.scala:

object Spark {
  val conf = new SparkConf()
    .setAppName("BDD Test")
    .setMaster("local[8]")
    .set("spark.default.parallelism", "8")
    .set("spark.sql.shuffle.partitions", "8")

  val sc = new SparkContext(conf)
  LogManager.getRootLogger.setLevel(Level.ERROR)

  val sqlContext = new SQLContext(Spark.sc)
  sqlContext.setConf("spark.sql.shuffle.partitions", "8")
}

If using IntelliJ, like me, you’ll also need a test class to run your cucumber. Mine’s in Runtests.scala. Right click on this and select “Run tests” from the context menu and it’ll run the tests.

@RunWith(classOf[Cucumber])
class RunTests extends {
}

If using the command line, add this line to project/plugins.sbt:

addSbtPlugin("com.waioeka.sbt" % "cucumber-plugin" % "0.0.3")

And these to build.sbt:

enablePlugins(CucumberPlugin)
CucumberPlugin.glue := ""

First Very Simple Example

Here’s the first bit of actual cucumber. We’re using it for a contrived word-counting example here. The file starts with some furniture, defining the name of the Feature and some information on it’s purpose, usually in the format In order to achieve some business aim, As the user or beneficiary of the feature, I want some feature.

Feature: Basic Spark

  In order to prove you can do simple BDD with spark
  As a developer
  I want some spark tests

  Scenario: Count some words with an RDD
    When I count the words in "the complete works of Shakespeare"
    Then the number of words is '5'

The rest of the file is devoted to a series of Scenarios, these are the important bits. Each scenario should test a very specific behaviour, there’s no limit to the number of scenarios you can define, so take the opportunity to keep them focussed. As well as a descriptive name, each scenario is made of a number of steps. Steps can be Givens, Whens or Thens.

  • Given some precondition“: pre-test setup. Stuff like creating a mock filesystem object, setting up a dummy web server or initialising the Spark context
  • When some action“: call the function you’re testing; make the REST call, whatever
  • Then some test“: test the result is what you expected

Step Definitions

Each step is bound up to a method as shown in the “Steps” class below. When the feature file is “executed” the function bound to each step is executed. You can pass parameters to steps as shown here with the input string and the expected number of words. You can re-use steps in as many scenarios and features as you like. Note that the binding between steps and their corresponding functions is done with regular expressions.

class SparkSteps extends ScalaDsl with EN with Matchers {
  When("""^I count the words in "([^"]*)"$"""){ (input:String) =>
    Context.result = Spark.sc.parallelize(input.split(' ')).count().toInt
  }

  Then("""^the number of words is '(\d+)'$"""){ (expected:Int) =>
    Context.result shouldEqual expected
  }
}

The Context

The Context object here is used to store things… any variables needed by the steps. You could use private fields on the step classes to achieve this, but you’d quickly encounter problems when you began to define steps over multiple classes.

object Context {
  var result = 0
}

I don’t particularly like using a Context object like this, as it relies on having vars, which isn’t nice. If you know a better way, please do let me know via the comments box below!

Data Tables

So the word counting example above shows how we can do BDD with spark – we pass in some data and check the result. Great! But it’s not very real. The following example uses Spark DataFrames and Cucumber DataTables to do something a bit more realistic:

  Scenario: Joining data from two data frames to create a new data frame of results
    Given a table of data in a temp table called "housePrices"
      | Price:Int  | Postcode:String | HouseType:String |
      | 318000     | NN9 6LS         | D                |
      | 137000     | NN3 8HJ         | T                |
      | 180000     | NN14 6TN        | S                |
      | 249000     | NN14 6TN        | D                |
    And a table of data in a temp table called "postcodes"
      | Postcode:String | Latitude:Double | Longitude:Double |
      | NN9 6LS         | 51.1            | -1.2             |
      | NN3 8HJ         | 51.2            | -1.1             |
      | NN14 6TN        | 51.3            | -1.0             |
    When I join the data
    Then the data in temp table "results" is
      | Price:Int  | Postcode:String | HouseType:String | Latitude:Double | Longitude:Double |
      | 318000     | NN9 6LS         | D                | 51.1            | -1.2             |
      | 137000     | NN3 8HJ         | T                | 51.2            | -1.1             |
      | 180000     | NN14 6TN        | S                | 51.3            | -1.0             |
      | 249000     | NN14 6TN        | D                | 51.3            | -1.0             |

You only need to write the code to translate the data tables defined in your cucumber to data frames once. Here’s my version:

class ComplexSparkSteps extends ScalaDsl with EN with Matchers {
  def dataTableToDataFrame(data: DataTable): DataFrame = {
    val fieldSpec = data
      .topCells()
      .map(_.split(':'))
      .map(splits => (splits(0), splits(1).toLowerCase))
      .map {
        case (name, "string") => (name, DataTypes.StringType)
        case (name, "double") => (name, DataTypes.DoubleType)
        case (name, "int") => (name, DataTypes.IntegerType)
        case (name, "integer") => (name, DataTypes.IntegerType)
        case (name, "long") => (name, DataTypes.LongType)
        case (name, "boolean") => (name, DataTypes.BooleanType)
        case (name, "bool") => (name, DataTypes.BooleanType)
        case (name, _) => (name, DataTypes.StringType)
      }

    val schema = StructType(
      fieldSpec
        .map { case (name, dataType) =>
          StructField(name, dataType, nullable = false)
        }
    )

    val rows = data
      .asMaps(classOf[String], classOf[String])
      .map { row =>
        val values = row
          .values()
          .zip(fieldSpec)
          .map { case (v, (fn, dt)) => (v, dt) }
          .map {
            case (v, DataTypes.IntegerType) => v.toInt
            case (v, DataTypes.DoubleType) => v.toDouble
            case (v, DataTypes.LongType) => v.toLong
            case (v, DataTypes.BooleanType) => v.toBoolean
            case (v, DataTypes.StringType) => v
          }
          .toSeq

        Row.fromSeq(values)
      }
      .toList

    val df = Spark.sqlContext.createDataFrame(Spark.sc.parallelize(rows), schema)
    df
  }

  Given("""^a table of data in a temp table called "([^"]*)"$""") { (tableName: String, data: DataTable) =>
    val df = dataTableToDataFrame(data)
    df.registerTempTable(tableName)

    df.printSchema()
    df.show()
  }
}

Likewise, you can define a function to compare the output data frame with the “expected” data from the cucumber table. This is a simple implementation, I have seen some much classier versions which report the row and column of the mismatch etc.

  Then("""^the data in temp table "([^"]*)" is$"""){ (tableName: String, expectedData: DataTable) =>
    val expectedDf = dataTableToDataFrame(expectedData)
    val actualDf = Spark.sqlContext.sql(s"select * from $tableName")

    val cols = expectedDf.schema.map(_.name).sorted

    val expected = expectedDf.select(cols.head, cols.tail: _*)
    val actual = actualDf.select(cols.head, cols.tail: _*)

    println("Comparing DFs (expected, actual):")
    expected.show()
    actual.show()

    actual.count() shouldEqual expected.count()
    expected.intersect(actual).count() shouldEqual expected.count()
  }

Coverage Reporting

There’s a great coverage plugin for Scala which can easily be added to the project by adding a single line to plugins.sbt:

logLevel := Level.Warn

addSbtPlugin("com.waioeka.sbt" % "cucumber-plugin" % "0.0.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

The report is generated with the following SBT command and saved to HTML and XML formats for viewing or ingest by a tool (like SonarQube).

$ sbt clean coverage cucumber coverageReport

...

[info] Written Cobertura report [/Users/DTAYLOR/Development/bdd-spark/target/scala-2.10/coverage-report/cobertura.xml]
[info] Written XML coverage report [/Users/DTAYLOR/Development/bdd-spark/target/scala-2.10/scoverage-report/scoverage.xml]
[info] Written HTML coverage report [/Users/DTAYLOR/Development/bdd-spark/target/scala-2.10/scoverage-report/index.html]
[info] Statement coverage.: 94.69%
[info] Branch coverage....: 100.00%
[info] Coverage reports completed
[info] All done. Coverage was [94.69%]
[success] Total time: 1 s, completed 08-Aug-2016 14:27:17

Screenshot 2016-
08-08 14.29.12

Conclusion

So, hopefully this long and rambling article has made one key point: You can use BDD to develop Spark apps. The fact that you should isn’t something anyone can prove, it’s just something you’ll have to take on faith!

 

Predicting MOT Pass Rates with Spark MLlib

Every car in the UK, once its’s three years old, need to have an MOT test annually to prove it’s safe to drive. The good people at the DVLA have made a large chunk of the data available as part of the government’s push to make more data “open”.  You can download the MOT data here.  You can also get hold of all the code for this article in my GitHub.

Visualising the data

Before I started doing any machine learning, I did some basic visualisation of the data in a series of charts, just to get an idea of the “shape” of things.  I used Spark to process the data (there’s lots of it) and D3js to create some charts. I haven’t been able to make the charts work in WordPress yet but you can see them below as screenshots of elsewhere as a live document.

The data arrives in CSV format, which is very easy to digest but pretty slow when you’re dealing with tens of millions of rows. So the first thing I did was to transform the data to Parquet using Spark’s built in Parquet capabilities. This improved query performance massively.

Test counts over time

First thing to look at: how many tests are carried out on vehicles of a given age? Basically, how many 3-year-old, 4-year-old, 20-year-old… cars are on the road.  The dataset contains records for MOTs on cars well over 100 years old, but there aren’t many of them.

mot1

As you can see from the histogram, most tests are carried out on cars between 3 and 15ish years old.

mot2

The accompanying CDF shows that the 95% percentile is roughly around the 15 year mark.  Let’s zoom in a bit…

mot3

The zoomed-in histogram makes the 10-15 year shelf life of most cars pretty apparent.

Pass rates by age

Are people throwing away their older cars because they’re uncool or because they are broken?

mot5

A look at the pass rate over time shows that it’s probably because they’re broken.  The pass rate starts off pretty high – well over 85%, but dips top an all time low at 14 years of age.

mot6

Once cars get past the 14 year “death zone” their prospects get better though. As cars get older and older the first-test pass rate heads back up towards 100%. At around 60 years old, cars have a better chance of passing their MOT than when they’re brand new!

I guess it’s safe to assume that cars over 30 years of age are treated with a little more respect.  They’re “classics” after all. Once a car is 80+ years old it probably lives in a museum or private collection and drives very little throughout the year. The MOT test is much “easier” for older cars too – a 100 year old car does not have to pass emissions!

Manufacturers

The pass rate changes differently as cars from different manufacturers get older. Some manufacturers make “disposable” cars, some make cars designed to be classics the day they leave the showroom (Aston Martin, Lotus, Rolls Royce). Some make cheap cars that people care less about (Vauxhall, Ford), some make posh cars people take care of (Audi, BMW). Japanese manufacturers seem to be able to build cars with very steady pass rates over time.

mot7

It might not be a shock that Bentley, Porche are at the top here, with TVR close behind. For me the biggest surprise was that Ford takes the deepest dip at the 14 year mark. Fords are clearly not built to last… or maybe people don’t care for them.  Renault and Alpha Romeo join Ford at the bottom of the table here.

Numbers of cars

It’s all very well to be mean to Ford about their poor longevity, but they do have more cars on the road that pretty much anyone else.  Check out the heatmap: mot8

While we’re counting cars, it looks like silver is the most popular colour. The MOT test data “runs out” in 2013, so I’d expect to see a lot more white cars these days.

mot9

Some code

OK, so we’ve looked at some charts not let’s look at some code.  All the charts about were generated by simple Spark dataframe apps, wrapped up in a unit test harness for ease of use.  Here’s an example:

  it should "calculate pass rate by age band and make" in {
    val motTests = Spark.sqlContext.read.parquet(parquetData).toDF()
    motTests.registerTempTable("mot_tests")

    val results =
      motTests
        .filter("testClass like '4%'") // Cars, not buses, bikes etc
        .filter("testType = 'N'") // only interested in the first test
        .filter("firstUseDate <> 'NULL' and date <> 'NULL'")
        .withColumn("passCount", passCodeToInt(col("testResult")))
        .withColumn("age", testDateAndVehicleFirstRegDateToAge(col("date"), col("firstUseDate")))
        .groupBy("age", "make")
        .agg(count("*") as "cnt", sum("passCount") as "passCount")
        .selectExpr("make", "age", "cnt", "passCount * 100 / cnt as rate")
        .filter("cnt >= 1000")
        .rdd

    val resultMap =
      results
        .map({
          x => (
            x.getString(0),
            x.getInt(1),
            x.getLong(2),
            x.getDouble(3)
            )
        })

    val mappedResults =
      resultMap
        .groupBy { case (make, age, cnt, rate) => make }
        .map { case (make, stuff) =>
          AgeAndMakeResults(make,
            stuff
              .map { case (_, age, cnt, rate) => new RateByAge(age, cnt, rate) }
              .filter(x => x.age >= 3 && x.age <= 20) .toSeq ) } .filter(_.series.length >= 18)
        .collect()

    JsonWriter.writeToFile(mappedResults, resultsPath + "passRateByAgeBandAndMake.json")
  }

Not sure what else there is to say about the code. Have a read or hit my github if you want to play around with it!

Machine Learning:  Predicting pass rate

Spark’s MLlib codes with all sorts of machine learning algorithms for predicting and classifying (mainly the latter) data.  I looked at decision trees, random forests and neural networks for this.  The idea was to turn some properties of a vehicle such as age, mileage, manufacturer, model, fuel type and so on into a pass/fail prediction.

It didn’t work! Yes, sorry, that’s right, it’s not really possible to predict a straight pass or fail.  Even in the worst case, the first-test pass rate for all different classes of car is over 50%. Machine learning techniques being as they are, this means that the simplest solution for any predictive model is simply to predict a pass every time.

This happened with all three techniques – neural nets, decision trees and random forests all “learned” to predict a pass every time, giving them a 50-60-ish% accuracy.  Darn it!

Predicting Pass Probability Classes

So, if you can’t predict two classes (“PASS” and “FAIL”) maybe its easier to predict Pass Probability Classes (50-60%, 60-70%, 70-80% etc).  Well, yes it was slightly more successful, but not exactly stunning!

The best results I got were predicting 10 pass rate classes for each decile of probability. This gave me these rather lame results:

Mean Error: 1.1532896239958372
Precision: 0.3961880457753499

So the mean error is greater than 1 – i.e. most test data entries are classified over one class away from their true class.  The precision shows only 40% of samples being predicted correctly.  Pants.

The confusion matrix tells a slightly more positive story though – here it is rendered as a colour map:

mot10

The confusion matrix shows the class predicted by the model (column) versus the actual class of the sample (row). A perfect predictor would give a diagonal green line from top left to bottom right, showing every class predicted correctly.

In this case, the random forest is attempting to predict the banded pass rate (0: 0% – 10%, 1: 10 – 20%, 2: 20 – 30%, … 9: 90% – 100%). Since virtually no classes of vehicle exist where the pass rate is less than 40% it doesn’t do very well at those levels, however, from 40% to 80% it does pretty well.

Some More Code

The code is complex – Spark makes it easy to run machine learning algorithms, but there’s a lot of bits and bobs round the edges like UDFs and utility functions. The following listing is the algorithm which gave me the results above (my best attempt).  Hit the githib link at the top of this page if you want to dig further into the code.

it should "use a decision tree to classify probability classes" in {
    val motTests = Spark.sqlContext.read.parquet(parquetData).toDF()
    motTests.registerTempTable("mot_tests")

    val keyFields = Seq("make", "colour", "mileageBand", "cylinderCapacity", "age", "isPetrol", "isDiesel")

    // Get the distinct values for category fields
    val distinctCategoryValues = Seq("make", "colour")
      .map(fieldName => (fieldName, motTests.select(col(fieldName)).distinct().map(_.getString(0)).collect().toList)).toMap

    // A UDF to convert a text field into an integer index
    // Should probably do this before the Parquet file is written
    val indexInValues = udf((key : String, item : String) => distinctCategoryValues(key).indexOf(item))

    val data =
      motTests
        .filter("testClass like '4%'") // Cars, not buses, bikes etc
        .filter("firstUseDate <> 'NULL' and date <> 'NULL'") // Must be able to calculate age
        .filter("testMileage > 0") // ignore tests where no mileage reported
        .filter("testType = 'N'") // only interested in the first test
        .withColumn("testPassed", passCodeToInt(col("testResult")))
        .withColumn("age", testDateAndVehicleFirstRegDateToAge(col("date"), col("firstUseDate")))
        .withColumn("isPetrol", valueToOneOrZero(lit("P"), col("fuelType")))
        .withColumn("isDiesel", valueToOneOrZero(lit("D"), col("fuelType")))
        .withColumn("mileageBand", mileageToBand(col("testMileage")))
        .groupBy(keyFields.map(col): _*)
        .agg(count("*") as "cnt", sum("testPassed") as "passCount")
        .filter("cnt > 10")
        .withColumn("passRateCategory", passRateToCategory(col("cnt"), col("passCount")))
        .withColumn("make", indexInValues(lit("make"), col("make")))
        .withColumn("colour", indexInValues(lit("colour"), col("colour")))
        .selectExpr((keyFields :+ "passRateCategory").map(x => s"cast($x as double) $x"):_*)
        .cache()

    data.printSchema()

    val labeledPoints = toFeatures(data, "passRateCategory", keyFields)

    labeledPoints.take(10).foreach(println)

    val Array(trainingData, testData, validationData) = labeledPoints.randomSplit(Array(0.8, 0.1, 0.1))
    trainingData.cache()
    testData.cache()
    validationData.cache()

    trainingData.take(10).foreach(println)

    val categoryMap = Seq("make", "colour").map(field => {
      ( data.columns.indexOf(field), distinctCategoryValues(field).length )
    }).toMap

    val model = RandomForest.trainClassifier(trainingData, 11, categoryMap, 20, "auto", "gini", 8, 500)

    val predictionsAndLabels = validationData.map(row => (model.predict(row.features), row.label))
    predictionsAndLabels.take(10).foreach(println)
    val metrics = new MulticlassMetrics(predictionsAndLabels)

    val error = math.sqrt(predictionsAndLabels.map({ case (v, p) => math.pow(v - p, 2)}).sum() / predictionsAndLabels.count())
    println(s"Mean Error: $error")
    println(s"Precision: ${metrics.precision}")

    println("Confusion Matrix")
    println(metrics.confusionMatrix)

    CsvWriter.writeMatrixToFile(metrics.confusionMatrix, resultsPath + "decision-tree-probability-classes-confusion-matrix.csv")

    for(x <- 0 to 10) {
      println(s"Class: $x, Precision: ${metrics.precision(x)}, Recall: ${metrics.recall(x)}")
    }
  }

Conclusions

I think I proved that MLlib and Spark are a great choice for writing machine learning algorithms very quickly and with very little knowledge.

I think I also proved that Data Scientists need to know a hell of a lot more than how to fire up a random forest. I know a little bit about data and machine learning (thus the name of this website!) but in order to make much sense of a dataset like this you need a whole arsenal of tricks up your sleeve.

As usual, D3.js and AngularJS are great.