Rubies in the Rough

This is where I try to teach how I think about programming.

22

AUG
2014

Sleepy Programs

When we think of real multiprocessing, our thoughts probably drift more towards languages like Erlang, Go, Clojure, or Rust. Such languages really focus on getting separate "processes" to communicate via messages. This makes it a lot easier to know when one process is waiting on another, because calls to receive messages typically block until one is available.

But what about Ruby? Can we do intelligent process coordination in Ruby?

Yes, we can. The tools for it are more awkward though. It's easy to run into tricky edge cases and hard to code your way out of them correctly.

Let's play with an example to see how good we can make things. Here's what we will do:

  1. We will start one parent process that will fork() a single child process
  2. The child will push three messages onto a RabbitMQ queue and exit()
  3. The parent will listen for three messages to arrive, then exit()

Here's a somewhat sloppy first attempt at solving this:

#!/usr/bin/env ruby

require "benchmark"

require "bunny"

QUEUE_NAME = "example"
MESSAGES   = %w[first second third]

def send_messages(*messages)
  connection = Bunny.new.tap(&:start)
  exchange   = connection.create_channel.default_exchange

  messages.each do |message|
    exchange.publish(message, routing_key: QUEUE_NAME)
  end

  connection.close
end

def listen_for_messages(received_messages)
  connection = Bunny.new.tap(&:start)
  queue      = connection.create_channel.queue(QUEUE_NAME, auto_delete: true)

  queue.subscribe do |delivery_info, metadata, payload|
    received_messages << payload
  end

  time_it("Received #{MESSAGES.size} messages") do
    yield
  end

  connection.close
end

def time_it(name)
  elapsed = Benchmark.realtime do
    yield
  end
  puts "%s: %.2fs" % [name, elapsed]
end

def wait_for_messages(received_messages)
  until received_messages == MESSAGES
    sleep 0.1  # don't peg the CPU while we wait
  end
end

def send_and_receive
  pid = fork do
    sleep 3  # make sure we're receiving before they are sent
    send_messages(*MESSAGES)
  end
  Process.detach(pid)

  received_messages = [ ]
  listen_for_messages(received_messages) do
    wait_for_messages(received_messages)
  end
end

send_and_receive

Let's talk about each piece of this code real quick. You can mostly ignore the first two methods, send_messages() and listen_for_messages(). These are just wrappers over RabbitMQ's publish and subscribe process. The only tricky bit is that listen_for_messages() does a yield after subscribing to the queue. The reason for this is that subscribing just spins up a separate Thread which will call the passed block as messages arrive. That's happening in the background, which means the main Thread needs to find some way to wait until we have received the expected messages. The yield gives us a place to insert this waiting code.

The next two methods, time_it() and wait_for_messages(), are simple helpers. I added the first mainly to give us some noticeable output. The latter performs the waiting and checking discussed above.

The real action happens in send_and_receive(). This method should look a lot like the steps we defined earlier: fork() off a child, send_messages(), then listen_for_messages().

Now this code has a couple of problems. One way to see them is to run it:

$ ruby sleepy.rb
Received 3 messages: 3.03s

Doesn't three seconds sound a little slow for modern hardware communicating via a super efficient queuing system? Yeah, it is.

I actually put the sleeps in the code manually. Look for these two lines:

# ...

    sleep 0.1  # don't peg the CPU while we wait

# ...

    sleep 3  # make sure we're receiving before they are sent

# ...

Now it's obvious where the three second delay is coming from, eh? Let's talk about why I added that second sleep().

The issue is that once we fork() that child process, it's off to the races. The parent process will continue running too, but we don't know who will get to what first. If the child fires off messages before the parent is listening for them, they will be missed. Instead we need the child to wail until the parent is ready to begin the experiment.

My three second sleep is one crude way to sort of handle this. I just delay the child for a significant period of time in computerland. Odds are that the parent will be setup by the time it starts sending. It could still fail though, if my machine was under heavy load at the time and it didn't give my parent process enough attention before the child woke up. Plus, it's slowing our experiment way down. In other words, this is a bad idea all around.

The good news is that we can fix it by making some semi-cryptic changes to just one method:

def send_and_receive
  reader, writer = IO.pipe
  pid            = fork do
    writer.close

    reader.read
    reader.close

    send_messages(*MESSAGES)
  end
  Process.detach(pid)
  reader.close

  received_messages = [ ]
  listen_for_messages(received_messages) do
    writer.puts "ready"
    writer.close

    wait_for_messages(received_messages)
  end
end

As you can see, I've introduced a pipe. A pipe is a one-way communication channel between processes. You get an endpoint to write to and another to read from. After you fork(), it's good practice to have each side close() the end they're not using. Then I just have the child call read() on the pipe. This will block until the parent sends some content that can be read. The parent completes its setup, including subscribing to the queue, and then it pushes a simple "ready" message down the pipe. That will get the child unblocked and sending messages.

Does this change help? Yes, a lot:

$ ruby sleepy.rb
Received 3 messages: 0.10s

We're three seconds faster.

Unfortunately, the remaining delay looks suspiciously like my other call to sleep(). Here's that code to refresh your memory:

def wait_for_messages(received_messages)
  until received_messages == MESSAGES
    sleep 0.1  # don't peg the CPU while we wait
  end
end

This loop just periodically checks to see if we have our three messages yet. We could technically remove the call to sleep() here and it would run. However, it would waste a lot of CPU time just checking these messages over and over again as fast as possible. Ironically, that bid for speed might starve the child process of resources and slow things down. So we kind of need the sleep(), or something like it.

But the problem remains that we're likely getting our messages very quickly and then just waiting for a sleep() call to run out so we notice they have arrived. We can do better with one simple change:

def listen_for_messages(received_messages)
  connection = Bunny.new.tap(&:start)
  queue      = connection.create_channel.queue(QUEUE_NAME, auto_delete: true)

  main_thread = Thread.current
  queue.subscribe do |delivery_info, metadata, payload|
    received_messages << payload
    main_thread.wakeup
  end

  time_it("Received #{MESSAGES.size} messages") do
    yield
  end

  connection.close
end

The difference here is that I capture the main_thread before I setup my subscription. Remember, that block will be called in a different Thread. Then, each time I receive a message, I cancel any sleep() the main_thread is currently doing with a call to wakeup(). This means it will recheck, when it should, as new messages arrive.

That gives us another significant speed boost:

$ ruby sleepy.rb 
Received 3 messages: 0.01s

I would probably stop here, but I should warn you that my solution isn't perfect. Some might be tempted to take this final step:

def wait_for_messages(received_messages)
  until received_messages == MESSAGES
    sleep
  end
end

Here the short sleep() has been changed into an indefinite one. You would think this is OK, because the other Thread will wake us when the time comes. Sadly, it's not because my last fix added a race condition. Consider what would happen if the Threads executed code in this order:

# ...

  # first the main thread checks, but finds only two of the three messages:
  until received_messages == MESSAGES

# ...

    # then the listening thread queues the final message and wakes the main
    # thread (this has no effect since it isn't currently sleeping):
    received_messages << payload
    main_thread.wakeup

# ...

    # finally the main thread goes back to sleep, forever:
    sleep

As long as you leave my short sleep, you'll only pay a small penalty if this edge case does kick in.

Could we ensure it didn't happen though? Yes, with more message passing! Here's the final code:

#!/usr/bin/env ruby

require "benchmark"
require "thread"

require "bunny"

QUEUE_NAME = "example"
MESSAGES   = %w[first second third]

def send_messages(*messages)
  connection = Bunny.new.tap(&:start)
  exchange   = connection.create_channel.default_exchange

  messages.each do |message|
    exchange.publish(message, routing_key: QUEUE_NAME)
  end

  connection.close
end

def listen_for_messages(received_messages, check_queue, listen_queue)
  connection = Bunny.new.tap(&:start)
  queue      = connection.create_channel.queue(QUEUE_NAME, auto_delete: true)

  queue.subscribe do |delivery_info, metadata, payload|
    received_messages << payload

    check_queue << :check
    listen_queue.pop
  end

  time_it("Received #{MESSAGES.size} messages") do
    yield
  end

  connection.close
end

def time_it(name)
  elapsed = Benchmark.realtime do
    yield
  end
  puts "%s: %.2fs" % [name, elapsed]
end

def wait_for_messages(received_messages, check_queue, listen_queue)
  loop do
    check_queue.pop

    break if received_messages == MESSAGES

    listen_queue << :listen
  end
end

def send_and_receive
  reader, writer = IO.pipe
  pid            = fork do
    writer.close

    reader.read
    reader.close

    send_messages(*MESSAGES)
  end
  Process.detach(pid)
  reader.close

  received_messages = [ ]
  check_queue       = Queue.new
  listen_queue      = Queue.new
  listen_for_messages(received_messages, check_queue, listen_queue) do
    writer.puts "ready"
    writer.close

    wait_for_messages(received_messages, check_queue, listen_queue)
  end
end

send_and_receive

Look Ma, no sleep()!

My changes here are very similar to the earlier pipe trick, only I used a Thread-safe Queue. The pop() method of a Queue will block waiting just like IO's read() did. I also had to introduce two Queues, because I needed two-way communication. The listening Thread now tells the main Thread when it's time to check and it won't resume listening again until the main Thread gives approval.

I think this version is safe from race conditions and it doesn't wake up periodically to check things that haven't changed. It's also still as fast as the unsafe version.

If you must do safe multiprocessing, in any language, just pass messages.

Comments (8)
  1. Daniel Schierbeck
    Daniel Schierbeck August 23rd, 2014 Reply Link

    Couldn't you create a persistent queue in the writer process - then the parent doesn't need to be ready when the write happens and you can avoid synchronizing. I think the RabbitMQ way would be for both the reader and the writer to create the queue.

    1. Reply (using GitHub Flavored Markdown)

      Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

      Ajax loader
    2. James Edward Gray II
      James Edward Gray II August 23rd, 2014 Reply Link

      You could use persistent queues, yes.

      That's a big change though, in my opinion. You'll want to think about things like message volume and what your storage needs will be. Also, persistence is going to slow things a bit. There are definitely cases where you want to make these tradeoffs, but I don't think this is a decision you make lightly.

      1. Reply (using GitHub Flavored Markdown)

        Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

        Ajax loader
      2. Daniel Schierbeck
        Daniel Schierbeck August 25th, 2014 Reply Link

        The queue actually doesn't have to be persistent (i.e. stored on disk) it just needs to be a "shared" queue that's not tied to a single consumer. That way, the consumer can come and go. It'll still be in memory, and the messages will be retained by Rabbit until a consumer has grabbed them.

        1. Reply (using GitHub Flavored Markdown)

          Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

          Ajax loader
        2. James Edward Gray II
          James Edward Gray II August 25th, 2014 Reply Link

          Good to know. Thanks for clarifying.

          1. Reply (using GitHub Flavored Markdown)

            Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

            Ajax loader
  2. Maurizio De Magnis
    Maurizio De Magnis August 29th, 2014 Reply Link

    Instead we need the child to wail unit the parent is ready to begin the experiment.

    wait until :-)

    1. Reply (using GitHub Flavored Markdown)

      Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

      Ajax loader
    2. James Edward Gray II
      James Edward Gray II August 29th, 2014 Reply Link

      Fixed. Thanks.

      1. Reply (using GitHub Flavored Markdown)

        Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

        Ajax loader
  3. Andrew Ryan Lazarus
    Andrew Ryan Lazarus September 5th, 2014 Reply Link

    Question: can a timeout be easily added to this? I was working on a similar problem recently, but using Timeout.timeout doesn't seem safe since I'm waiting on a resource from a pool. If the timeout raises at the wrong time, the resource doesn't get checked back into the pool. Ideally, a blocking call with a built in timeout would be what I want.

    Something like: q.pop(timeout)

    1. Reply (using GitHub Flavored Markdown)

      Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

      Ajax loader
    2. James Edward Gray II
      James Edward Gray II September 5th, 2014 Reply Link

      You can use Queue in a non-blocking fashion to achieve what you are after. This requires a little bit of a busy loop, sleeping a bit and retrying until you hit your limit.

      If you want to handle your scenario without a busy loop, see Thread::handle_interrupt(). There's even an example in that documentation using Timeout.

      1. Reply (using GitHub Flavored Markdown)

        Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

        Ajax loader
Leave a Comment (using GitHub Flavored Markdown)

Comments on this blog are moderated. Spam is removed, formatting is fixed, and there's a zero tolerance policy on intolerance.

Ajax loader