6
SEP2014
Taking Rust to Task
Now that I've reached the point where I can get some Rust code running without asking questions in IRC every five minutes, I really wanted to play with some tasks. Tasks are the way Rust handles multiprocessing code. Under the hood they can map one-to-one with operating system threads or you can use a many-to-one mapping that I'm not ready to go into yet.
Probably one of the most exciting aspect of tasks in Rust, in my opinion, is that unsafe use of shared memory is rejected outright as a compile error. That lead me to want to figure out how you communicate correctly. (Spoiler: the same was you do in Ruby: just pass messages.)
Ready to dive in, I grossly simplified a recent challenge from work and coded it up in Rust. You can get the idea with a glance at main()
:
use std::collections::HashMap;
// ...
fn string_vec(strs: &[&'static str]) -> Vec<String> {
let mut v = Vec::new();
for s in strs.iter() {
v.push(s.to_string());
}
v
}
fn main() {
let mut services = HashMap::new();
services.insert("S1".to_string(), string_vec(["A", "B"]));
services.insert("S2".to_string(), string_vec(["A", "C"]));
services.insert("S3".to_string(), string_vec(["C", "D", "E", "F"]));
services.insert("S4".to_string(), string_vec(["D", "B"]));
services.insert("S5".to_string(), string_vec(["A", "Z"]));
let work = Work(Search::new("A".to_string(), "B".to_string()));
let mut task_manager = TaskManager::new(services);
task_manager.run(work);
}
The HashMap
sets up the story. Let's assume we have various travel services: S1, S2, etc. Each of those services can reach various stops. For example, S1 can reach A and B. Now the work
variable tells you what we want to do, which is to search for ways to get from A to B.
The twist is handled by TaskManager
. It's going to run each service in an isolated separate process (a task in Rust speak) that only knows about its own stops. This means finding non-direct paths must involve inter-process communication.
The string_vec()
helper just builds a vector of String
objects for me. I'm not using String
for its mutability, but it's more of an attempt to clarify ownership without scattering lifetime specifications everywhere. (This may not been ideal. Please remember that I'm still a pretty green Rust programmer.)
If the code I write works, it will find two possible paths for our search and indeed it does:
$ ./pathfinder
Path: A--S1-->B
Path: A--S2-->C--S3-->D--S4-->B
The rest of this blog post will examine just how it accomplishes this.
Let's begin with a pretty trivial data structure:
#[deriving(Clone)]
struct Path {
from: String,
to: String,
service: String
}
impl Path {
fn new(from: String, to: String, service: String) -> Path {
Path{from: from, to: to, service: service}
}
fn to_string(&self) -> String {
self.from
.clone()
.append("--")
.append(self.service.as_slice())
.append("-->")
.append(self.to.as_slice())
}
}
This just defines a Path
as two endpoints and the service
that connects them. You can see that the to_string()
method gives us the simple ASCII arrow output from the solution.
Search
builds on Path
:
#[deriving(Clone)]
struct Search {
from: String,
to: String,
paths: Vec<Path>
}
impl Search {
fn new(from: String, to: String) -> Search {
Search{from: from, to: to, paths: vec![]}
}
fn services(&self) -> Vec<String> {
self.paths.iter().map(|path| path.service.clone()).collect()
}
fn stops(&self) -> Vec<String> {
let mut all = vec![];
all.push(self.from.clone());
all.push(self.to.clone());
for path in self.paths.iter() {
all.push(path.from.clone());
all.push(path.to.clone());
}
all
}
fn add_path(&self, path: Path) -> Search {
Search{ from: path.to.clone(),
to: self.to.clone(),
paths: self.paths.clone().append([path]) }
}
}
A Search
holds the endpoints we're currently trying to traverse and any paths
collected so far in an attempt to reach our goal. The add_path()
method is used with partial matches. It returns a new Search
for the remaining distance with the partial Path
added to the list.
The other methods, services()
and stops()
, just return lists of what has been used so far in the accumulated paths. These are helpful in avoiding building circular paths.
Note that both data structures so far are cloneable. This is so that copies can be made to sent across channels to other tasks.
We need two more aggregate data structures to be the messages that we will shuttle between tasks:
#[deriving(Clone)]
enum Job {
Work(Search),
Finish
}
enum Event {
Match(Vec<Path>),
Partial(Vec<Search>),
Done(String)
}
A Job
can be either a wrapped Search
we want to perform or the special Finish
flag. The latter just tells a task to bust out of its infinite listening-for-searches loop and exit cleanly. This enum
lists the messages we can send to the service tasks.
Messages from a service task (back to our not yet examined TaskManager
) are called an Event
. There are three possible types. Match
means we have a full solution, using the included paths. Partial
means that the task matched part of the path and is sending back a list of searches to try for locating the rest of it. Again Done
is a special flag that pretty much means, "I've got nothing." This flag includes the name of the service for a reason that will become obvious after we view this next helper object:
struct SearchTracker {
counts: HashMap<String, uint>
}
impl SearchTracker {
fn new<'a, I: Iterator<&'a String>>(mut service_names: I) -> SearchTracker {
let mut tracker = SearchTracker{counts: HashMap::new()};
for name in service_names {
tracker.counts.insert(name.clone(), 0);
}
tracker
}
fn add_search(&mut self) {
for (_, count) in self.counts.mut_iter() {
*count += 1;
}
}
fn mark_done(&mut self, name: String) {
let count = self.counts.get_mut(&name);
*count -= 1;
}
fn is_done(&self) -> bool {
self.counts.values().all(|n| *n == 0)
}
}
The idea behind SearchTracker
is that we want to know when we're done seeing results from the various tasks. We can do that by expecting exactly one response (an Event
) from each task for each Search
sent.
This object just makes a counter for each service name passed into the constructor. You can then add_search()
to bump all counters (because each Search
is sent to all service tasks) and mark_done()
to reduce a named counter when you receive a response from that service. The is_done()
method will return true when all the counts balance out.
We need one last helper before we get into the actual process:
struct MultiTaskSender<T> {
senders: Vec<Sender<T>>
}
impl<T: Clone + Send> MultiTaskSender<T> {
fn new() -> MultiTaskSender<T> {
MultiTaskSender{senders: vec![]}
}
fn add_sender(&mut self, sender: Sender<T>) {
self.senders.push(sender);
}
fn send(&self, t: T) {
for sender in self.senders.iter() {
sender.send(t.clone());
}
}
}
As I said, each Search
will be sent to all tasks. This simple multicaster just allows you to add_sender()
as each task is built and later send()
to all those channels. The actual object sent is handled as a generic type that we just need to be able to Clone
and Send
.
We're finally ready for the main workhorse of this code:
struct TaskManager {
services: HashMap<String, Vec<String>>,
tracker: SearchTracker,
multi_sender: MultiTaskSender<Job>,
event_sender: Sender<Event>,
event_receiver: Receiver<Event>
}
impl TaskManager {
fn new(services: HashMap<String, Vec<String>>) -> TaskManager {
let (sender, receiver) = channel();
TaskManager{ services: services.clone(),
tracker: SearchTracker::new(services.keys()),
multi_sender: MultiTaskSender::new(),
event_sender: sender,
event_receiver: receiver }
}
fn run(&mut self, work: Job) {
self.launch_services();
self.send_job(work);
self.wait_for_services();
self.send_job(Finish);
}
// ...
}
This shows you what a TaskManager
keeps track of, which is just the data for our challenge, the helper objects, and various channels (the pipes of communication between Rust tasks). You can see how this gets setup in new()
.
Once we have everything we need to track, run()
actually does the Job
. It will:
- Launch a task for each service
- Send the full search we want to perform
- Wait for and respond to reported work from the tasks
- Signal all tasks to shutdown when the work is done
Two pieces of this process are beating heart of the system. Here's the first of those:
impl TaskManager {
// ...
fn launch_services(&mut self) {
for (name, stops) in self.services.clone().move_iter() {
let task_event_sender = self.event_sender.clone();
let (search_sender, search_receiver) = channel();
self.multi_sender.add_sender(search_sender.clone());
spawn( proc() {
loop {
let job = search_receiver.recv();
match job {
Work(search) => {
if stops.contains(&search.from) &&
stops.contains(&search.to) {
let path = Path::new(
search.from,
search.to,
name.clone()
);
let paths = search.paths.append([path]);
task_event_sender.send(Match(paths))
} else {
let mut tos = stops.clone();
let previous = search.stops();
tos.retain(|stop| !previous.contains(stop));
if !search.services().contains(&name) &&
stops.contains(&search.from) &&
!tos.is_empty() {
let searches = tos.iter().map( |to| {
let path = Path::new(
search.from.clone(),
to.clone(),
name.clone()
);
search.add_path(path)
} ).collect();
task_event_sender.send(Partial(searches));
} else {
task_event_sender.send(Done(name.clone()));
}
}
}
Finish => { break; }
}
}
} );
}
}
// ...
}
You're pretty much looking at a service task here (the whole part inside the call to spawn()
). Inside, they are just an endless loop
calling recv()
to get new Work
wrapped Search
objects from the channel. The first if
branch inside the match
of Work(search)
handles the simple case of the service matching the Search
exactly.
When the else
branch is selected, because we don't have a direct match, some work is done to see if a partial match is possible. (This is a foolish algorithm, by the way. It does partial matches if it can directly match the from
endpoint and not the to
. This rules out some viable scenarios, but it helped to keep this already large example smaller.)
If a partial match is found, it's transformed into a list of new searches to try that may later find direct matches or more partial matches. When no direct or partial match is found, the Done
flag is sent back so TaskManager
knows to stop waiting on this task.
The Finish
match
clause just breaks out of the loop
as described previously. Outside of spawn()
is a simple loop that creates each task and some code that prepares the variables for the task to capture.
impl TaskManager {
// ...
fn wait_for_services(&mut self) {
loop {
match self.event_receiver.recv() {
Match(paths) => {
let name = paths.last().expect("No path").service.clone();
self.tracker.mark_done(name);
let path_string = paths.iter().skip(1).fold(
paths[0].to_string(),
|s, p| s.append(p.to_string().as_slice().slice_from(1))
);
println!("Path: {}", path_string);
}
Partial(searches) => {
let name = searches.last()
.expect("No search")
.paths
.last()
.expect("No path")
.service
.clone();
self.tracker.mark_done(name);
for search in searches.iter() {
self.send_job(Work(search.clone()));
}
}
Done(name) => { self.tracker.mark_done(name) }
}
if self.tracker.is_done() { break; }
}
}
// ...
}
This chunk of code is the other half of the puzzle. It's another infinite loop
listening on the Event
channel. A full Match
is pretty printed and a Partial
is sent back out to the services in a wave of new searches. Regardless of the Event
type, we record the response for the sending service, though where we find the service name varies by case. This allows us to exit this loop
and the program when our tracker
says we're done.
There's only one final method on TaskManager
and it's what actually sends the messages to the service tasks, tracking each new Search
as it goes out:
impl TaskManager {
// ...
fn send_job(&mut self, job: Job) {
match job {
Work(_) => { self.tracker.add_search(); }
Finish => { /* do nothing */ }
}
self.multi_sender.send(job);
}
}
You can find the full code on GitHub.
Comments (1)
-
James Edward Gray II September 9th, 2014 Reply Link
Concurrency in Rust is a handy talk for learning what's available in Rust for multiprocessing.