22
AUG2014
Sleepy Programs
When we think of real multiprocessing, our thoughts probably drift more towards languages like Erlang, Go, Clojure, or Rust. Such languages really focus on getting separate "processes" to communicate via messages. This makes it a lot easier to know when one process is waiting on another, because calls to receive messages typically block until one is available.
But what about Ruby? Can we do intelligent process coordination in Ruby?
Yes, we can. The tools for it are more awkward though. It's easy to run into tricky edge cases and hard to code your way out of them correctly.
Let's play with an example to see how good we can make things. Here's what we will do:
- We will start one parent process that will
fork()
a single child process - The child will push three messages onto a RabbitMQ queue and
exit()
- The parent will listen for three messages to arrive, then
exit()
Here's a somewhat sloppy first attempt at solving this:
#!/usr/bin/env ruby
require "benchmark"
require "bunny"
QUEUE_NAME = "example"
MESSAGES = %w[first second third]
def send_messages(*messages)
connection = Bunny.new.tap(&:start)
exchange = connection.create_channel.default_exchange
messages.each do |message|
exchange.publish(message, routing_key: QUEUE_NAME)
end
connection.close
end
def listen_for_messages(received_messages)
connection = Bunny.new.tap(&:start)
queue = connection.create_channel.queue(QUEUE_NAME, auto_delete: true)
queue.subscribe do |delivery_info, metadata, payload|
received_messages << payload
end
time_it("Received #{MESSAGES.size} messages") do
yield
end
connection.close
end
def time_it(name)
elapsed = Benchmark.realtime do
yield
end
puts "%s: %.2fs" % [name, elapsed]
end
def wait_for_messages(received_messages)
until received_messages == MESSAGES
sleep 0.1 # don't peg the CPU while we wait
end
end
def send_and_receive
pid = fork do
sleep 3 # make sure we're receiving before they are sent
send_messages(*MESSAGES)
end
Process.detach(pid)
received_messages = [ ]
listen_for_messages(received_messages) do
wait_for_messages(received_messages)
end
end
send_and_receive
Let's talk about each piece of this code real quick. You can mostly ignore the first two methods, send_messages()
and listen_for_messages()
. These are just wrappers over RabbitMQ's publish and subscribe process. The only tricky bit is that listen_for_messages()
does a yield
after subscribing to the queue. The reason for this is that subscribing just spins up a separate Thread
which will call the passed block as messages arrive. That's happening in the background, which means the main Thread
needs to find some way to wait until we have received the expected messages. The yield
gives us a place to insert this waiting code.
The next two methods, time_it()
and wait_for_messages()
, are simple helpers. I added the first mainly to give us some noticeable output. The latter performs the waiting and checking discussed above.
The real action happens in send_and_receive()
. This method should look a lot like the steps we defined earlier: fork()
off a child, send_messages()
, then listen_for_messages()
.
Now this code has a couple of problems. One way to see them is to run it:
$ ruby sleepy.rb
Received 3 messages: 3.03s
Doesn't three seconds sound a little slow for modern hardware communicating via a super efficient queuing system? Yeah, it is.
I actually put the sleeps in the code manually. Look for these two lines:
# ...
sleep 0.1 # don't peg the CPU while we wait
# ...
sleep 3 # make sure we're receiving before they are sent
# ...
Now it's obvious where the three second delay is coming from, eh? Let's talk about why I added that second sleep()
.
The issue is that once we fork()
that child process, it's off to the races. The parent process will continue running too, but we don't know who will get to what first. If the child fires off messages before the parent is listening for them, they will be missed. Instead we need the child to wail until the parent is ready to begin the experiment.
My three second sleep is one crude way to sort of handle this. I just delay the child for a significant period of time in computerland. Odds are that the parent will be setup by the time it starts sending. It could still fail though, if my machine was under heavy load at the time and it didn't give my parent process enough attention before the child woke up. Plus, it's slowing our experiment way down. In other words, this is a bad idea all around.
The good news is that we can fix it by making some semi-cryptic changes to just one method:
def send_and_receive
reader, writer = IO.pipe
pid = fork do
writer.close
reader.read
reader.close
send_messages(*MESSAGES)
end
Process.detach(pid)
reader.close
received_messages = [ ]
listen_for_messages(received_messages) do
writer.puts "ready"
writer.close
wait_for_messages(received_messages)
end
end
As you can see, I've introduced a pipe. A pipe is a one-way communication channel between processes. You get an endpoint to write to and another to read from. After you fork()
, it's good practice to have each side close()
the end they're not using. Then I just have the child call read()
on the pipe. This will block until the parent sends some content that can be read. The parent completes its setup, including subscribing to the queue, and then it pushes a simple "ready"
message down the pipe. That will get the child unblocked and sending messages.
Does this change help? Yes, a lot:
$ ruby sleepy.rb
Received 3 messages: 0.10s
We're three seconds faster.
Unfortunately, the remaining delay looks suspiciously like my other call to sleep()
. Here's that code to refresh your memory:
def wait_for_messages(received_messages)
until received_messages == MESSAGES
sleep 0.1 # don't peg the CPU while we wait
end
end
This loop just periodically checks to see if we have our three messages yet. We could technically remove the call to sleep()
here and it would run. However, it would waste a lot of CPU time just checking these messages over and over again as fast as possible. Ironically, that bid for speed might starve the child process of resources and slow things down. So we kind of need the sleep()
, or something like it.
But the problem remains that we're likely getting our messages very quickly and then just waiting for a sleep()
call to run out so we notice they have arrived. We can do better with one simple change:
def listen_for_messages(received_messages)
connection = Bunny.new.tap(&:start)
queue = connection.create_channel.queue(QUEUE_NAME, auto_delete: true)
main_thread = Thread.current
queue.subscribe do |delivery_info, metadata, payload|
received_messages << payload
main_thread.wakeup
end
time_it("Received #{MESSAGES.size} messages") do
yield
end
connection.close
end
The difference here is that I capture the main_thread
before I setup my subscription. Remember, that block will be called in a different Thread
. Then, each time I receive a message, I cancel any sleep()
the main_thread
is currently doing with a call to wakeup()
. This means it will recheck, when it should, as new messages arrive.
That gives us another significant speed boost:
$ ruby sleepy.rb
Received 3 messages: 0.01s
I would probably stop here, but I should warn you that my solution isn't perfect. Some might be tempted to take this final step:
def wait_for_messages(received_messages)
until received_messages == MESSAGES
sleep
end
end
Here the short sleep()
has been changed into an indefinite one. You would think this is OK, because the other Thread
will wake us when the time comes. Sadly, it's not because my last fix added a race condition. Consider what would happen if the Thread
s executed code in this order:
# ...
# first the main thread checks, but finds only two of the three messages:
until received_messages == MESSAGES
# ...
# then the listening thread queues the final message and wakes the main
# thread (this has no effect since it isn't currently sleeping):
received_messages << payload
main_thread.wakeup
# ...
# finally the main thread goes back to sleep, forever:
sleep
As long as you leave my short sleep
, you'll only pay a small penalty if this edge case does kick in.
Could we ensure it didn't happen though? Yes, with more message passing! Here's the final code:
#!/usr/bin/env ruby
require "benchmark"
require "thread"
require "bunny"
QUEUE_NAME = "example"
MESSAGES = %w[first second third]
def send_messages(*messages)
connection = Bunny.new.tap(&:start)
exchange = connection.create_channel.default_exchange
messages.each do |message|
exchange.publish(message, routing_key: QUEUE_NAME)
end
connection.close
end
def listen_for_messages(received_messages, check_queue, listen_queue)
connection = Bunny.new.tap(&:start)
queue = connection.create_channel.queue(QUEUE_NAME, auto_delete: true)
queue.subscribe do |delivery_info, metadata, payload|
received_messages << payload
check_queue << :check
listen_queue.pop
end
time_it("Received #{MESSAGES.size} messages") do
yield
end
connection.close
end
def time_it(name)
elapsed = Benchmark.realtime do
yield
end
puts "%s: %.2fs" % [name, elapsed]
end
def wait_for_messages(received_messages, check_queue, listen_queue)
loop do
check_queue.pop
break if received_messages == MESSAGES
listen_queue << :listen
end
end
def send_and_receive
reader, writer = IO.pipe
pid = fork do
writer.close
reader.read
reader.close
send_messages(*MESSAGES)
end
Process.detach(pid)
reader.close
received_messages = [ ]
check_queue = Queue.new
listen_queue = Queue.new
listen_for_messages(received_messages, check_queue, listen_queue) do
writer.puts "ready"
writer.close
wait_for_messages(received_messages, check_queue, listen_queue)
end
end
send_and_receive
Look Ma, no sleep()
!
My changes here are very similar to the earlier pipe trick, only I used a Thread
-safe Queue
. The pop()
method of a Queue
will block waiting just like IO
's read()
did. I also had to introduce two Queue
s, because I needed two-way communication. The listening Thread
now tells the main Thread
when it's time to check and it won't resume listening again until the main Thread
gives approval.
I think this version is safe from race conditions and it doesn't wake up periodically to check things that haven't changed. It's also still as fast as the unsafe version.
If you must do safe multiprocessing, in any language, just pass messages.
Comments (8)
-
Daniel Schierbeck August 23rd, 2014 Reply Link
Couldn't you create a persistent queue in the writer process - then the parent doesn't need to be ready when the write happens and you can avoid synchronizing. I think the RabbitMQ way would be for both the reader and the writer to create the queue.
-
You could use persistent queues, yes.
That's a big change though, in my opinion. You'll want to think about things like message volume and what your storage needs will be. Also, persistence is going to slow things a bit. There are definitely cases where you want to make these tradeoffs, but I don't think this is a decision you make lightly.
-
The queue actually doesn't have to be persistent (i.e. stored on disk) it just needs to be a "shared" queue that's not tied to a single consumer. That way, the consumer can come and go. It'll still be in memory, and the messages will be retained by Rabbit until a consumer has grabbed them.
-
Good to know. Thanks for clarifying.
-
-
-
-
Instead we need the child to wail unit the parent is ready to begin the experiment.
wait until :-)
-
Fixed. Thanks.
-
-
Question: can a timeout be easily added to this? I was working on a similar problem recently, but using
Timeout.timeout
doesn't seem safe since I'm waiting on a resource from a pool. If the timeout raises at the wrong time, the resource doesn't get checked back into the pool. Ideally, a blocking call with a built in timeout would be what I want.Something like:
q.pop(timeout)
-
You can use
Queue
in a non-blocking fashion to achieve what you are after. This requires a little bit of a busy loop, sleeping a bit and retrying until you hit your limit.If you want to handle your scenario without a busy loop, see
Thread::handle_interrupt()
. There's even an example in that documentation usingTimeout
.
-