There is a fantastic restaurant chain in the DC area called Great American Restaurants. They have received numerous awards for their food, their service and their work environment. However, their service is one of the primary reasons I love going to their restaurants. They have an innovative team method of taking care of tables. Basically, if a server is walking by, they will check your table to see if you need anything. Which means empty glasses or plates don’t sit on your table. When you put the credit card down, the next server that walks by picks it up and brings it back immediately. Unlike restaurants where a single server is assigned to your table, at a GAR restaurant tasks are accomplished quickly because the restaurant makes all of the workers available to every table. This is parallel processing which we need to do with Ruby in the situation we found ourselves in.
This week, we are migrating a competitor’s database into our database. The competitor’s database will add millions of records to our database. The plan is to migrate all of their data and then index the whole combined database into Solr. But when we figured out that it was going to take 7 days to index all of our new data into Solr, we knew something had to change.
Our application is a Rails application. We use Sunspot for our ActiveRecord and Solr integration. Sunspot enables full-text and faceted searching of ActiveRecord models. When you need to reindex your models, Sunspot provides a nice rake task to handle reindexing, unfortunately, this is performed by a single threaded, non-parallel worker. This is where change needed to happen. We needed a full team of workers to get this data indexed in a timely manner.
The proper way to get parallel processing in your Ruby app is to use the Parallel gem. But I didn’t find that gem until after I had already created a working solution. However, after implementing this solution, I think knowing how to implement threading and forks is something that every Rubyist should know, so I will share with you my solution. This solution brings together one idea that is not covered in any of my searches. The idea is simply:
You need both Threads and Forks.
I found great articles on Threads. And I found great articles on Forks. But there were no articles that said, “but what you really need are Threads and Forks.” Let’s start by defining Threads and Forks.
Threads share the same CPU and the same Ruby interpreter. This means that all of your variables are available to all of the other threads. This has consequences, both good and bad, so be careful with how you are using variables with Threads.
Forks spawn into a separate CPU and Ruby interpreter. They get a copy of the variables that are not linked back to the variables of the parent process. If you change variables in a Fork, the parent process and the other Fork processes will not see those changes. Again, this has consequences, caution should be used here. One of my earliest attempts had me reindexing everything 5 times because I was copying the entire queue into every Fork. That did not speed things up!
My first attempt followed this example of Ruby threads. This gave me multiple workers but didn’t give me the multi-CPU solution that I needed.
To get multi-CPUs working I found this article on Ruby forks. But, this wasn’t the full answer either.
The goal was to keep each processor busy with work, we have 6 processors so I wanted to keep 5 of those processors busy at all times. My first strategy was to use Rails’ find_in_batches feature to loop over all of the records 1000 at a time and send each batch to Solr. My second attempt took the 1000 records, split them into 5 groups using Rails’ in_groups and then send those groups to Solr. Here’s the second attempt:
|cpu_available = 5|
|Order.includes(:products).find_in_batches do |batch||
|batch.in_groups(cpu_available, false) do |group||
This looks good, but has one small problem. If one of the groups is larger than the others, you could have idle CPU’s. You don’t want this when you have 1.7m Order records that need to be indexed. Imagine a large warehouse that handles Orders that are big and small. Orders that have 4 Products and Orders that have 1000 Products. Your CPU’s may be waiting a while if one of your Forks is indexing a couple of those Orders with 1000 products.
This is where you need Threads and Forks to keep all of the processors busy. Here is my final solution:
|batches = Queue.new|
|Order.select(:id).find_in_batches do |batch||
|batches << batch.map(&:id)|
|while(ids = batches.pop)|
|pid = Process.fork do|
|sleep 1 until batches.size.zero?|
I know that looks a little crazy, but it rocks, so let me explain how it works. We start off grabbing all of the ids of the Orders and putting them into a Queue. Each item in the Queue is a batch of 1000 Orders. We just want the ids and not the whole Order object so we call #map(&:id). Then we create new threads for each of our CPUs. Inside of each Thread we loop while there are batches still available in the Queue. For each batch we get, we create a new Fork. Inside of that Fork we execute the find that brings back all of the Order objects and its Products and throw the results into Solr. As soon as the Fork completes, you get the next batch and start again. The last block, watches the Queue and when the Queue is empty it cleans up all of the threads. The last line waits for all of the Forks to complete before allowing the script to finish.
Without the Thread the Fork gets a full copy of the Queue and tries to work the whole Queue. Without the Fork, the Thread stays on a single CPU. However, the Threads all share the same memory so the Queue is shared across all of the Threads. The Forks are then given a single batch to work and then they are gone until the next Fork is created. Just like my favorite restaurant, this is efficient and wonderful. All of the CPUs are kept busy until there is no more work left on the Queue.