Profile Photo

Jamie Skipworth


Technology Generalist | Software & Data


Dask

If you’ve ever tried to do any serious data analytics at some point you’ve probably hit some sort of hardware limitation. Usually it’s running out of memory, since most analytics tools tend to try and load all your data at once. Either that, or your analysis requires more CPU resources than Intel can manufacture in an entire year.

Getting around these limitations generally involves sampling your data so that it does fit in memory and can run on a single machine, or you’d bust-out the big guns and find yourself a cluster to use.

Great, but if you’re a data scientist (and I’m definitely not) who uses Python with packages like Pandas, Numpy and/or Scikit-Learn, you probably don’t want to re-write your code to get it running on another platform or cluster. Wouldn’t it be nice if you could run the same code locally as well as in a distributed way, without large re-writes?

Scalable Analytics with Dask

Dask supports various data constructs familiar to users of existing Python analytics frameworks, including:

The Data

In this post I’m going to play around a bit with some of these, and demo a few ways they can be used on some real, big-ish data. That ‘big-ish’ data is a 3.2GB XML dump of the StackOverflow user data. It contains one <row> element on a single line for each user. When pretty-printed it looks like this:

<row
    Id="-1"
    Reputation="1"
    CreationDate="2008-07-31T00:00:00.000"
    DisplayName="Community"
    LastAccessDate="2008-08-26T00:16:53.810"
    WebsiteUrl="http://meta.stackexchange.com/"
    Location="on the server farm"
    AboutMe="&lt;p&gt;Hi, I'm not really a person.&lt;/p&gt;&#10;&#10;&lt;p&gt;I'm a background process that helps keep this site clean!"
    Views="649"
    UpVotes="342842"
    DownVotes="1123154"
    AccountId="-1"
/>

Normally I’m quite lazy, but I had some time so decided to write a dirty little Python script to extract these elements from this file without eating up memory. You can also ask it to convert XML attributes to Python dictionaries which can in-turn be converted to JSON, a bit like this:

python3 ./extract.py ../data/Users.xml row --json | head -1 | jq

Which yields the following sort of thing:

{
  "Id": "-1",
  "Reputation": "1",
  "CreationDate": "2008-07-31T00:00:00.000",
  "DisplayName": "Community",
  "LastAccessDate": "2008-08-26T00:16:53.810",
  "WebsiteUrl": "http://meta.stackexchange.com/",
  "Location": "on the server farm",
  "AboutMe": "<p>Hi, I'm not really a person.</p>&#xA;&#xA;<p>I'm a background process that helps keep this site clean!",
  "Views": "649",
  "UpVotes": "342842",
  "DownVotes": "1123154",
  "AccountId": "-1"
}

So, what I want to do is this:

  • Extract all <row> elements from the XML file, excluding child elements and data (there are neither).
  • Convert each XML <row> element into JSON.
  • Evenly distribute the output into 8 separate files so we can better process them concurrently.

With the hacky extract.py script, we can achieve all of that with this mangled chain of commands:

$ time split -d -n l/8 --verbose  ../data/Users.xml ../data/split/users_  --additional-suffix .xml | cut -d \' -f2 | xargs -P 8 -n 1 -t -I{} bash -c 'python3 extract.py $1 row --json > ${1%.xml}.json' -- {}
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_00.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_01.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_02.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_03.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_04.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_05.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_06.xml
bash -c python3 extract.py $1 row --json > ${1%.xml}.json -- ../data/split/users_07.xml

real    10m7.136s
user    71m13.689s
sys     0m25.372s

That splits the XML files into 8 equally-sized files, extracts the file names, and then uses xargs to run extract.py on each of the 8 files concurrently.

Into Daskness

With all that data wrangling now out of the way, I can get to experimenting with Dask. I’ve not used pandas much before so this is all relatively new to me; be warned, I’m sure there are a few problems in my code. Cut me some slack.

I’m going to do some really simple stuff. I’m going to do basic analytics on each user’s location on my local machine, getting the user count for each country. Here’s some code that does that:

import sys
from pprint import pprint
from datetime import datetime, timedelta
import dask
import dask.dataframe as ddf
import dask.bag as db
import pycountry
import us
from distributed import Client, LocalCluster
import argparse


def process_users(filepath):
    """
        Reads JSON files into a Dask DataFrame nad persists it
    """
    users = ddf.read_json(filepath)
    users.persist()
    return users

def count_country_users(df):
    """
        Performs the count of users
    """
    return df.Location.value_counts().compute().sort_values(ascending=False)

def get_args():
    parser = argparse.ArgumentParser(description="Perform some user counts for each country")
    parser.add_argument("input_files", 
                        type=str, 
                        metavar="INPUT_JSON_FILES", 
                        help="Input file(s). Only JSON files supported")
    parser.add_argument("output_file", 
                        type=str, 
                        metavar="OUTPUT_FILE", 
                        help="Output file to write results to")
    return parser.parse_args()

if __name__ == '__main__':

    # Parse command line args
    args = get_args()

    # Define a LocalCluster
    cluster = LocalCluster(n_workers=8, memory_limit="8G")
    client = Client(cluster)

    # Read-in the data and perform the counts. These tasks
    # are submitted to the LocalCluster
    df_users = client.submit(process_users, args.input_files)
    df_country = client.submit(count_country_users, df_users)

    # Write the top-10 countries by user count
    dfr = df_country.result()

    with open(args.output_file, 'wt') as fd:
        for i, v in dfr.iteritems():
            fd.write("{}: {}\n".format(i, v))

That code defines 2 functions. One that reads-in the input files (process_users), and another that performs the counting (count_country). Those functions are submitted to a local cluster that perform the work in a distributed way. When I run this I get the answer in about 6 minutes.

$ time python3 ./dask_country_count.py "../data/split/*.json" ./output.txt

real    6m45.892s
user    0m29.606s
sys     0m4.916s

$ head -13 ./output.txt
: 69086
India: 60654
Bangalore, Karnataka, India: 44587
Germany: 29741
Hyderabad, Telangana, India: 26159
Pune, Maharashtra, India: 24957
Chennai, Tamil Nadu, India: 22862
China: 22094
London, United Kingdom: 19744
United States: 17934
France: 17741
Mumbai, Maharashtra, India: 16229
Paris, France: 15874
USA: 15146

It’s a bit gnarly isn’t it? I’ve got a bunch of duplicates because the location is user generated and inconsistent, misspelled or not a location at all. Here are a few helpful examples:

/dev/null: 71
The Moon: 62
Somewhere in Middle Earth: 1
USA: 15146
United States: 17934

Clearly I don’t really care about hobbits, astronauts or special unix devices, so next I’m going to try to clean this up a bit using some mild fuzzy-matching using a Python library called pycountry which contain country names and codes.

I’m going to use pycountry in an attempt to make these counts a bit more accurate, so things like all those different Indian locations just become India, and all the other strange ones like The Moon become Unknown. I’m also going to use the us library to look up US counties, state names and abbreviations since a lot of US users only put their state.

I’ll replace the count_country_users with the one below, and add a new lookup_country function. All the other code remains the same:

import pycountry
import us

def count_country_users(df):
    """
        Lookup country and Performs the count of users
    """
    df['country'] = df['Location'].apply(lookup_country, meta=str)
    return df.country.value_counts().compute().sort_values(ascending=False)

def lookup_country(loc):
    """
        Look-up a given location string
    """
    if not isinstance(loc, str):
        return "Unknown"

    for country in pycountry.countries:
        if country.name.lower() in loc.lower() or \
           " {}".format(country.alpha_2.lower()) in loc.lower() or \
           " {}".format(country.alpha_3.lower()) in loc.lower():
            return country.name

    for state in us.states.STATES:
        if state.name.lower() in loc.lower() or state.abbr in loc:
            return "United States"

If I run the modified program outputs much less messy-looking data below. Another cool thing about Dask is that whilst your job is running, if you go to localhost:8787 you’re presented with a ton of data about your Dask pipeline which is a dead handy way to find bottlenecks.

Dask status page

Anyway - the output:

$ time python3 dask_country_count.py "../data/split/*.json" output.txt

real    13m58.287s
user    1m6.666s
sys     0m9.106s

$ cat output.txt
Unknown: 7807876
India: 376941
Canada: 152178
United States: 141462
Germany: 108374
Sao Tome and Principe: 105437
United Kingdom: 65850
Brazil: 56341
France: 56056
Switzerland: 50828
Australia: 46636
Niger: 46070
Pakistan: 36581
Poland: 34647
Belgium: 33891
Côte d'Ivoire: 32958
Spain: 32215
China: 32070
Morocco: 31353
Colombia: 28901
Israel: 28799
Bangladesh: 28276
Indonesia: 25988
Italy: 24315
Saint Helena, Ascension and Tristan da Cunha: 20315
Guam: 20186

The list of countries is now much more consistent, although not perfect. The processing time has doubled, but I’m sure there are ways to make this more efficient.

Now I’m not a data scientist - I don’t have a background in maths or stats, and I only know enough to be dangerous (in the bad way!), but the cool thing about Dask is that this same code can be used both on my local machine, or a huge cluster if I wanted. There shouldn’t be any large code changes required!