Rusting

The section where I examine what I am learning when compiling Rust code.

6

SEP
2014

Taking Rust to Task

Now that I've reached the point where I can get some Rust code running without asking questions in IRC every five minutes, I really wanted to play with some tasks. Tasks are the way Rust handles multiprocessing code. Under the hood they can map one-to-one with operating system threads or you can use a many-to-one mapping that I'm not ready to go into yet.

Probably one of the most exciting aspect of tasks in Rust, in my opinion, is that unsafe use of shared memory is rejected outright as a compile error. That lead me to want to figure out how you communicate correctly. (Spoiler: the same was you do in Ruby: just pass messages.)

Ready to dive in, I grossly simplified a recent challenge from work and coded it up in Rust. You can get the idea with a glance at main():

use std::collections::HashMap;

// ...

fn string_vec(strs: &[&'static str]) -> Vec<String> {
    let mut v = Vec::new();
    for s in strs.iter() {
        v.push(s.to_string());
    }
    v
}

fn main() {
    let mut services = HashMap::new();
    services.insert("S1".to_string(), string_vec(["A", "B"]));
    services.insert("S2".to_string(), string_vec(["A", "C"]));
    services.insert("S3".to_string(), string_vec(["C", "D", "E", "F"]));
    services.insert("S4".to_string(), string_vec(["D", "B"]));
    services.insert("S5".to_string(), string_vec(["A", "Z"]));

    let work = Work(Search::new("A".to_string(), "B".to_string()));

    let mut task_manager = TaskManager::new(services);
    task_manager.run(work);
}

The HashMap sets up the story. Let's assume we have various travel services: S1, S2, etc. Each of those services can reach various stops. For example, S1 can reach A and B. Now the work variable tells you what we want to do, which is to search for ways to get from A to B.

The twist is handled by TaskManager. It's going to run each service in an isolated separate process (a task in Rust speak) that only knows about its own stops. This means finding non-direct paths must involve inter-process communication.

The string_vec() helper just builds a vector of String objects for me. I'm not using String for its mutability, but it's more of an attempt to clarify ownership without scattering lifetime specifications everywhere. (This may not been ideal. Please remember that I'm still a pretty green Rust programmer.)

If the code I write works, it will find two possible paths for our search and indeed it does:

$ ./pathfinder 
Path: A--S1-->B
Path: A--S2-->C--S3-->D--S4-->B

The rest of this blog post will examine just how it accomplishes this.

Let's begin with a pretty trivial data structure:

#[deriving(Clone)]
struct Path {
    from:    String,
    to:      String,
    service: String
}
impl Path {
    fn new(from: String, to: String, service: String) -> Path {
        Path{from: from, to: to, service: service}
    }

    fn to_string(&self) -> String {
        self.from
            .clone()
            .append("--")
            .append(self.service.as_slice())
            .append("-->")
            .append(self.to.as_slice())
    }
}

This just defines a Path as two endpoints and the service that connects them. You can see that the to_string() method gives us the simple ASCII arrow output from the solution.

Search builds on Path:

#[deriving(Clone)]
struct Search {
    from:  String,
    to:    String,
    paths: Vec<Path>
}
impl Search {
    fn new(from: String, to: String) -> Search {
        Search{from: from, to: to, paths: vec![]}
    }

    fn services(&self) -> Vec<String> {
        self.paths.iter().map(|path| path.service.clone()).collect()
    }

    fn stops(&self) -> Vec<String> {
        let mut all = vec![];
        all.push(self.from.clone());
        all.push(self.to.clone());
        for path in self.paths.iter() {
            all.push(path.from.clone());
            all.push(path.to.clone());
        }
        all
    }

    fn add_path(&self, path: Path) -> Search {
        Search{ from:  path.to.clone(),
                to:    self.to.clone(),
                paths: self.paths.clone().append([path]) }
    }
}

A Search holds the endpoints we're currently trying to traverse and any paths collected so far in an attempt to reach our goal. The add_path() method is used with partial matches. It returns a new Search for the remaining distance with the partial Path added to the list.

The other methods, services() and stops(), just return lists of what has been used so far in the accumulated paths. These are helpful in avoiding building circular paths.

Note that both data structures so far are cloneable. This is so that copies can be made to sent across channels to other tasks.

We need two more aggregate data structures to be the messages that we will shuttle between tasks:

#[deriving(Clone)]
enum Job {
    Work(Search),
    Finish
}

enum Event {
    Match(Vec<Path>),
    Partial(Vec<Search>),
    Done(String)
}

A Job can be either a wrapped Search we want to perform or the special Finish flag. The latter just tells a task to bust out of its infinite listening-for-searches loop and exit cleanly. This enum lists the messages we can send to the service tasks.

Messages from a service task (back to our not yet examined TaskManager) are called an Event. There are three possible types. Match means we have a full solution, using the included paths. Partial means that the task matched part of the path and is sending back a list of searches to try for locating the rest of it. Again Done is a special flag that pretty much means, "I've got nothing." This flag includes the name of the service for a reason that will become obvious after we view this next helper object:

struct SearchTracker {
    counts: HashMap<String, uint>
}
impl SearchTracker {
    fn new<'a, I: Iterator<&'a String>>(mut service_names: I) -> SearchTracker {
        let mut tracker = SearchTracker{counts: HashMap::new()};
        for name in service_names {
            tracker.counts.insert(name.clone(), 0);
        }
        tracker
    }

    fn add_search(&mut self) {
        for (_, count) in self.counts.mut_iter() {
            *count += 1;
        }
    }

    fn mark_done(&mut self, name: String) {
        let count = self.counts.get_mut(&name);
        *count -= 1;
    }

    fn is_done(&self) -> bool {
        self.counts.values().all(|n| *n == 0)
    }
}

The idea behind SearchTracker is that we want to know when we're done seeing results from the various tasks. We can do that by expecting exactly one response (an Event) from each task for each Search sent.

This object just makes a counter for each service name passed into the constructor. You can then add_search() to bump all counters (because each Search is sent to all service tasks) and mark_done() to reduce a named counter when you receive a response from that service. The is_done() method will return true when all the counts balance out.

We need one last helper before we get into the actual process:

struct MultiTaskSender<T> {
    senders: Vec<Sender<T>>
}
impl<T: Clone + Send> MultiTaskSender<T> {
    fn new() -> MultiTaskSender<T> {
        MultiTaskSender{senders: vec![]}
    }

    fn add_sender(&mut self, sender: Sender<T>) {
        self.senders.push(sender);
    }

    fn send(&self, t: T) {
        for sender in self.senders.iter() {
            sender.send(t.clone());
        }
    }
}

As I said, each Search will be sent to all tasks. This simple multicaster just allows you to add_sender() as each task is built and later send() to all those channels. The actual object sent is handled as a generic type that we just need to be able to Clone and Send.

We're finally ready for the main workhorse of this code:

struct TaskManager {
    services:       HashMap<String, Vec<String>>,
    tracker:        SearchTracker,
    multi_sender:   MultiTaskSender<Job>,
    event_sender:   Sender<Event>,
    event_receiver: Receiver<Event>
}
impl TaskManager {
    fn new(services: HashMap<String, Vec<String>>) -> TaskManager {
        let (sender, receiver) = channel();
        TaskManager{ services:       services.clone(),
                     tracker:        SearchTracker::new(services.keys()),
                     multi_sender:   MultiTaskSender::new(),
                     event_sender:   sender,
                     event_receiver: receiver }
    }

    fn run(&mut self, work: Job) {
        self.launch_services();
        self.send_job(work);
        self.wait_for_services();
        self.send_job(Finish);
    }

    // ...
}

This shows you what a TaskManager keeps track of, which is just the data for our challenge, the helper objects, and various channels (the pipes of communication between Rust tasks). You can see how this gets setup in new().

Once we have everything we need to track, run() actually does the Job. It will:

  1. Launch a task for each service
  2. Send the full search we want to perform
  3. Wait for and respond to reported work from the tasks
  4. Signal all tasks to shutdown when the work is done

Two pieces of this process are beating heart of the system. Here's the first of those:

impl TaskManager {
    // ...

    fn launch_services(&mut self) {
        for (name, stops) in self.services.clone().move_iter() {
            let task_event_sender                = self.event_sender.clone();
            let (search_sender, search_receiver) = channel();
            self.multi_sender.add_sender(search_sender.clone());
            spawn( proc() {
                loop {
                    let job = search_receiver.recv();
                    match job {
                        Work(search) => {
                            if stops.contains(&search.from) &&
                               stops.contains(&search.to) {
                                let path  = Path::new(
                                    search.from,
                                    search.to,
                                    name.clone()
                                );
                                let paths = search.paths.append([path]);
                                task_event_sender.send(Match(paths))
                            } else {
                                let mut tos      = stops.clone();
                                let     previous = search.stops();
                                tos.retain(|stop| !previous.contains(stop));
                                if !search.services().contains(&name) &&
                                   stops.contains(&search.from)       &&
                                   !tos.is_empty() {
                                    let searches = tos.iter().map( |to| {
                                        let path = Path::new(
                                            search.from.clone(),
                                            to.clone(),
                                            name.clone()
                                        );
                                        search.add_path(path)
                                    } ).collect();
                                    task_event_sender.send(Partial(searches));
                                } else {
                                    task_event_sender.send(Done(name.clone()));
                                }
                            }
                        }
                        Finish       => { break; }
                    }
                }
            } );
        }
    }

    // ...
}

You're pretty much looking at a service task here (the whole part inside the call to spawn()). Inside, they are just an endless loop calling recv() to get new Work wrapped Search objects from the channel. The first if branch inside the match of Work(search) handles the simple case of the service matching the Search exactly.

When the else branch is selected, because we don't have a direct match, some work is done to see if a partial match is possible. (This is a foolish algorithm, by the way. It does partial matches if it can directly match the from endpoint and not the to. This rules out some viable scenarios, but it helped to keep this already large example smaller.)

If a partial match is found, it's transformed into a list of new searches to try that may later find direct matches or more partial matches. When no direct or partial match is found, the Done flag is sent back so TaskManager knows to stop waiting on this task.

The Finish match clause just breaks out of the loop as described previously. Outside of spawn() is a simple loop that creates each task and some code that prepares the variables for the task to capture.

impl TaskManager {
    // ...

    fn wait_for_services(&mut self) {
        loop {
            match self.event_receiver.recv() {
                Match(paths)      => {
                    let name = paths.last().expect("No path").service.clone();
                    self.tracker.mark_done(name);

                    let path_string = paths.iter().skip(1).fold(
                        paths[0].to_string(),
                        |s, p| s.append(p.to_string().as_slice().slice_from(1))
                    );
                    println!("Path: {}", path_string);
                }
                Partial(searches) => {
                    let name = searches.last()
                                       .expect("No search")
                                       .paths
                                       .last()
                                       .expect("No path")
                                       .service
                                       .clone();
                    self.tracker.mark_done(name);

                    for search in searches.iter() {
                        self.send_job(Work(search.clone()));
                    }
                }
                Done(name)        => { self.tracker.mark_done(name) }
            }
            if self.tracker.is_done() { break; }
        }
    }

    // ...
}

This chunk of code is the other half of the puzzle. It's another infinite loop listening on the Event channel. A full Match is pretty printed and a Partial is sent back out to the services in a wave of new searches. Regardless of the Event type, we record the response for the sending service, though where we find the service name varies by case. This allows us to exit this loop and the program when our tracker says we're done.

There's only one final method on TaskManager and it's what actually sends the messages to the service tasks, tracking each new Search as it goes out:

impl TaskManager {
    // ...

    fn send_job(&mut self, job: Job) {
        match job {
            Work(_) => { self.tracker.add_search(); }
            Finish  => { /* do nothing */ }
        }
        self.multi_sender.send(job);
    }
}

You can find the full code on GitHub.

In: Rusting | Tags: Concurrency & Rust | 1 Comment
Comments (1)
  1. James Edward Gray II
    James Edward Gray II September 9th, 2014 Reply Link

    Concurrency in Rust is a handy talk for learning what's available in Rust for multiprocessing.

    1. Reply (using GitHub Flavored Markdown)

      Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

      Ajax loader
Leave a Comment (using GitHub Flavored Markdown)

Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

Ajax loader