Profile Photo

Jamie Skipworth


Technology Generalist | Software & Data


Practical Multi-threading in Rust

Ages ago I wrote a rudimentary word counter in Rust, and then demoed how to spawn threads in Rust here. In this post I’m going to mash the two together so that my word counter can process multiple files concurrently instead of in serial.

The Rust documentation here has some great tips on how to create and manage threads, and in this section it explains how to create your own thread pool library.

But, I’m actually going to use the scoped_threadpool crate because I’m lazy and the people who wrote it are far more clever than I am. This gives you all the thread management goodies for free.

File Globbing

The previous version of the code took explicit file names as command line arguments. This is tedious if you have a number of files your want to process, so the first thing I’m going to do (again, because I’m lazy) is to support file globbing using the glob crate.

Looking at the updated main using glob doesn’t really look much different. All we’re doing is adding-in an additional loop that takes each command line argument and interprets it as a file globbing pattern, which will yield one or more file names. For more deets, read the glob crate documentation.

extern crate glob;
use glob::glob;
use std::result::Result; // I'm using this to filter bad glob patterns

// <SNIP!>

fn main() {

    // Get command line arguments
    let args: Vec<String> = env::args().collect();

    // Determine if we have any arguments.
    if args.len() < 2 {

        panic!( "Program arguments missing. Please provide a file name" );            

    } 

    // Get arguments from the command line, skipping the program name
    let files: Vec<String> = Vec::from( &args[1..] );

    // Iterate through file names
    for file_arg in files.iter() {

        // HERE!: This is the new file globbing loop.
        // Match the glob pattern, filtering out bad paths
        for file_name in glob(file_arg).unwrap().filter_map(Result::ok) {  

            // Turn into a Path
            let path = Path::new( &file_name ); 

            // Execute count_file() on it, parsing the response.
            match count_file( path ) {

                Ok( ( lines, words ) ) => {
                    println!("{}\t{} lines\t{} words.", path.display(), lines, words );
                },
                Err( err ) => {
                    panic!("Error - {}", err );
                }

            };
        }
    }
}

Nice, so now we can pass-in any number of explicit file names or glob patterns. Next, I’ll add the actual threading code.

Scoped Threadpools

As mentioned above, the scoped_threadpool crate is what I’m using to provide a pool of worker threads to do my bidding. All I do is define a new pool with the desired number of worker threads, then call the scoped function to borrow the pool so I can give it things to do.

fn main() {

    // Get command line arguments
    let args: Vec<String> = env::args().collect();

    // Set the max number of workers
    let max_workers = 4;

    // Determine if we have any arguments. 
    if args.len() < 2 {
        panic!( "Program arguments missing. Please provide a file name" );            
    }

    // Get arguments from the command line, skipping the program name
    let files: Vec<String> = Vec::from( &args[1..] );

    // Create a pool of worker threads
    let mut pool = Pool::new( max_workers );

    // This borrows the pool, and blocks until all threads are done.
    pool.scoped( |scoped| {            

            // Iterate through file names. 
            for file_arg in files.iter() {     

                // Match the glob pattern, filtering out bad paths
                for file_name in glob(file_arg).unwrap().filter_map(Result::ok) {                    

                    // Fire-off a worker thread. Does not block.
                    scoped.execute( move || {
                            let path = Path::new( &file_name ); 
                            
                            // Execute count_file() on it, parsing the response.
                            match count_file( path ) {
                                // Parse the result
                                Ok( ( lines, words ) ) => {
                                    println!("{}\t{} lines\t{} words.", path.display(), lines, words );
                                },
                                Err( err ) => {
                                    panic!("Error - {}", err );
                                }

                            };
                        } 
                    ); 
                }
            } 
        }
    ); 
}

The pool.scoped call sounds a little cryptic but really what this does is borrow the pool, so I can give it things to execute. Once I’m running within the scope of the pool, I can call execute to run things in separate threads.

Calls to execute use move to transfer ownership of variables into the thread. The threadpool will then block until all worker threads have completed.

I’d recommend reading the tutorial on how to create your own thread pool because it goes into detail about all the nitty-gritty stuff you need to think about - mutexes, Arcs, channels, joins, etc!

Data

So I guess I’d better run the thing and see how it performs, huh? Let’s grab some data to test it on first.

The data I’ve chosen is 4 plain-text extracts from IMDB which I downloaded from here, weighing-in at around 2.5GB. I had to use iconv -c to convert some of the files into proper UTF-8; it seems there are a few ‘bad’ characters in there somewhere.

Start Your Engines

Lets run both serial and threaded versions of the word counter to see how much faster it is. I’m using the --release option to tell the Rust compiler to optimise the code. Unoptimised code takes several minutes to execute, instead of only seconds if optimised.

Single-threaded

$ time cargo run --release ../data/*utf8*
    Finished release [optimized] target(s) in 0.55s
     Running `target/release/wc-serial ../data/actors-utf8.list ../data/actresses-utf8.list ../data/bio-utf8.list ../data/plot-utf8.list`
../data/actors-utf8.list    18490946 lines  167871802 words.
../data/actresses-utf8.list 11011667 lines  99232504 words.
../data/bio-utf8.list   10827262 lines  88712596 words.
../data/plot-utf8.list  6928509 lines   58776571 words.

real    0m13.272s
user    0m11.470s
sys 0m0.875s

Multi-threaded

$ time cargo run --release ../data/*utf8*
    Finished release [optimized] target(s) in 0.04s
     Running `target/release/wc-threaded ../data/actors-utf8.list ../data/actresses-utf8.list ../data/bio-utf8.list ../data/plot-utf8.list`
../data/plot-utf8.list  6928509 lines   58776571 words.
../data/bio-utf8.list   10827262 lines  88712596 words.
../data/actresses-utf8.list 11011667 lines  99232504 words.
../data/actors-utf8.list    18490946 lines  167871802 words.

real    0m7.101s
user    0m14.992s
sys 0m0.964s

Results

The multi-threaded code is clearly the winner here with a total processing time of 7s, compared to 13s for the single-threaded version.

2.5GB in 7 seconds isn’t bad, just quietly!

All code can be found on GitHub.