Build a processing queue with multi-threading and spread over multiple servers in less than a day hours using RabbitMQ and Celery.

As we move through the development cycle we now have many of the essential processing modules we need for Tribalytic, but we also have a few challenges we need to deal with as well:

  1. We need to collect data faster than we can process it on a single processor. Requirement — we need to be able to collect the data and then spread it out for processing over multiple processors / servers.
  2. Some servers have a quota on how much they can process per hour for certain things (related to API limits set by Twitter etc.) Requirement — servers need to know their available limits and not take on more than they can process.
  3. We need to be able to schedule some tasks on a regular basis.

There are lots of different ways that this could be done, but after some preliminary research, I’m going to set out to do it using two key pieces of technology:

  1. RabbitMQ — An AMQP (Advanced Message Queuing Protocol) server. Robust message queues help a LOT with point one in particular.
  2. Celery — A pythonic / Django friendly task scheduler and queuing interface to Rabbit / AMQP. Celery provides the magic for points two and three.

The goal of this document is for me to both document what I’ve learnt so I can replicate in our production environment and lay out the “easy” steps after having digested the various documentation for you to able to implement something that solves some common problems quickly.

There are all sort of reasons that I’ve preselected these technologies which aren’t the point of this post — this post is documenting the challenge of having selected this approach, how easy is it to implement? Firstly a few notes about my setup:

  1. I have two machines I’ll be configuring this on, both running Ubuntu 9.10 Karmic Koala release. I’ll call them Server and Laptop. For the record, Server is a PC with 4 cores and 6Gb of RAM. Laptop is an MSI laptop with a single core (1.3Ghz) and 2Gb RAM. When you see Server and Laptop, replace them with your own machine names.
  2. Python and Django are already pre-configured and working.

Finally, there are a lot of great resources out there, but the few I relied on the most (and where some of the content here is adapted from) are:

  1. http://www.rabbitmq.com
  2. http://ask.github.com/celery/index.html
  3. http://groups.google.com/group/celery-users

Installation Steps

I set up Server first .

  • Install Celery
  • To do this, I used easy_install for Celery as follows.
$ sudo easy_install celery
  • Next install RabbitMQ — Originally I simply used Synaptic Package Manager in Ubuntu, searched for Rabbit and installed it and its dependencies, but I noticed this is version 1.6.0 and the latest here is 1.7.2 at time of writing, so I downloaded and installed the latest package instead. I doubt it will make much difference which you use.
  • Test the installation — using the following steps below which I largely copied from here

Configure the security / vhost etc. Note this is an important step you’ll almost certainly come back to later if you stuff up :-)

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_permissions -p myvhost myuser “” “.*” “.*”

At this point the server should actually be running (this confused me first time, I think the Rabbit Controller starts the server to add the hosts etc.). You can check if there is a server running as follows:

$ sudo rabbitmqctl status

To start and stop the service, use the following

$ sudo rabbitmq-server

or to run it in the background (recommended)

$ sudo rabbitmq-server -detached

Finally, to stop the server

$ sudo rabbitmqctl stop

Leave the server stopped for now and the basic Server install is complete.

I repeated these steps on Laptop without any problems at all. NB you could skip configuring security etc. on Laptop if you like as it will be reset in the next step, I think it’s worth it anyway just as practice.

NB — If you see a dump like the following when starting the server then in my case this was because the server was already started — use the rabbitmqctl status to check.:

{error_logger,{{2010,2,23},{11,26,37}},”Protocol: ~p: register error: ~p~n”,[“inet_tcp”,{{badmatch,{error,duplicate_name}},[{inet_tcp_dist,listen,1},{net_kernel,start_protos,4},{net_kernel,start_protos,3},{net_kernel,init_node,2},{net_kernel,init,1},{gen_server,init_it,6},{proc_lib,init_p_do_apply,3}]}]}etc. etc. etc.

Elapsed Time: Configure two PCs with Celery and Rabbit and document it? ~1 hr. For you? 30 minutes.

Cluster the RabbitMQ Servers

Knowing I want this to work across at least two servers from day one, I decided to next cluster the RabbitMQ servers.

This is remarkably straight forward if you follow the step listed in the RabbitMQ Clustering guide and DON’T miss the step I did on configuring the Erlang cookie. I had a couple of minor issues that I needed to read up on, my Linux knowledge being very sketchy which slowed me up. Here are the steps I ended up following (you could use the guide linked, but I’ve just added in a couple of things relevant to what we are doing here).

  • Firstly, configure the erlang cookie so that the erlang installs on the two machines can share processes. This required changing permissions first. Make sure the RabbitMQ server is STOPPED. When you are editing the cookie, simply replace whatever is in there with your own string. It needs to be the same string on both Server and Laptop. Length doesn’t matter. The default security on the cookie file is very tight, I needed to change permissions to be able to edit it, then I changed them back. Replace gedit with your editor of choice (e.g. vi).
$ sudo chmod 777 /var/lib/rabbitmq/.erlang.cookie
$ sudo gedit /var/lib/rabbitmq/.erlang.cookie
$ sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
  • Make sure that laptop and server are in each others local hosts file (by this I mean that your machine name for your Server equivalent needs to be in the local host for Laptop and vice versa). If you can ping Server from Laptop and vice versa, you should be fine.
  • On Server, start RabbitMQ in detached mode, check status and make sure it’s running.
  • On Laptop, make sure the RabbitMQ Server is stopped, then make sure you’ve set the cookie as above. Start the RabbitMQ server in detached mode and now complete the following steps.
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl cluster rabbit@server
$ sudo rabbitmqctl start_app

Note — if you have trouble with the reset (because like me you tried to actually cluster the machine BEFORE the cookie was set in Erlang) you can try sudo rabbitmqctl force_reset which should sort it out.

  • You now have a clustered RabbitMQ setup. You can verify this by typing sudo rabbitmqctl status on either server and you’ll see it list something like the following:
[{running_applications,[{rabbit,”RabbitMQ”,”1.7.2"},
{mnesia,”MNESIA CXC 138 12",”4.4.10"},
{os_mon,”CPO CXC 138 46",”2.2.2"},
{sasl,”SASL CXC 138 11",”2.1.6"},
{stdlib,”ERTS CXC 138 10",”1.16.2"},
{kernel,”ERTS CXC 138 10",”2.13.2"}]},
{nodes,[‘rabbit@laptop’,’rabbit@server’]},
{running_nodes,[‘rabbit@laptop’,’rabbit@server’]}]

This is neat, but if I read the documentation correctly what we have here is a RAM / RAM cluster. If one of the servers goes down, we will be fine because the message state is replicated across clusters, but if the whole lot went out (because the data centre lost power, or more likely in my situation that I just turned both PCs off over night) we might really want a persistent DISK node.

To convert Server to being a disk node, simply execute the following on Server (while the cluster is running).

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl cluster rabbit@laptop rabbit@server

Or (FYI) to turn it back into a RAM node. Note it doesn’t matter for our purposes in this doco how they are configured, read up and decide what you need.

$ sudo rabbitmqctl cluster rabbit@laptop

Finally, start it up again

$ sudo rabbitmqctl start_app

To remove a server from a cluster at any time, simply do this (this is not a required step, it’s just “FYI”).

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl start_app

NB — You’ll need to use force_reset for the LAST node in the cluster to be removed (if you’re separating them all out again). REMEMBER if you do a full cluster reset (like I did in testing this), you’ll need to redo the security section from the first section again :-) This is because you’ve reset both nodes so they no longer hold the security information. If you ever get into problems with the configuration, I’ve resolved them consistently by de-clustering, forcing a node reset on both nodes and then clustering and redoing the security.

Elapsed Time: Research, reading, implementing and trouble shooting cluster ~2 hr. Following the above steps, prob. 30 minutes.

Configure Celery with Django

OK, now we are ready to get Celery setup with Django. Create a new django project. Mines called “clifton” (we use local train stations as our milestone names) and then within that I have an app called fetcher. All of these steps need to be done on both Server and Laptop (I’m using SVN so I simply make the change on Server, commit and then update from Laptop).

  • In the INSTALLED_APPS section of settings.py add celery as follows.
INSTALLED_APPS = (
‘django.contrib.sessions’,
‘django.contrib.sites’,
‘celery’,
‘clifton.fetcher’,
)
  • Sync the DB as celery adds a couple of task tracking tables.
$ python manage.py syncdb
  • Add the following settings into settings.py as well. These are just to get us going, we’ll come back to this and improve them soon once we’ve got this section working.
BROKER_HOST = “localhost”
BROKER_PORT = 5672
BROKER_USER = “myuser”
BROKER_PASSWORD = “mypassword”
BROKER_VHOST = “myvhost”
  • Finally, in your application directory (e.g. fetcher in my example) create a tasks.py module with the following code. This is literally just for a test to show it’s working. Note it MUST be called tasks.py. It uses the decorator to wrap a class definition around a simple add function that adds two numbers and returns a result. Note that the decorator causes the class to have the name of the function (e.g. we effectively end up with a class called add in the tasks.py).
from celery.decorators import task
@task()
def add(x, y):
return x + y

Having completed these steps on Server and Laptop, we now need to actually test and run our AMQP workers. Just on Server for now, do the following steps.

  • Start a terminal window from the clifton directory (or wherever you put your Django app) start a celery daemon using Django. Again, there are more options here, but this is just a starting point to show it all working.
$ python manage.py celeryd
  • Now open a second terminal window and execute the following from the clifton directory. We use the Django interpretive shell to be sure all the settings are loaded nicely for us.
$ python manage.py shell
>> from clifton.fetcher import tasks
>> result = tasks.add.delay(4,4)
>> result.ready()
True
>> result.result
8

So what just happened? Well, Celery wrapped up the add function (which simply adds x and y and returns them) in a class through the decorator. We then, through the Django interpreter shell, imported the class, called the delay method, passing the parameters 4 and 4. Celery then handled all the work of pushing that out onto the RabbitMQ server, then the celeryd worker actually got the values, executed them and provided the results. The result.ready() command told us that our results for this particular instance had been processed and then result.result returned what that result actually was. If result.ready() is False, you may find that the AMQP server is running but you don’t have the worker process running.

We’ve just passed our first message. There is a LOT going on under the hood here, but this example is fairly uninspiring. Shut down the celeryd terminal (send it a TERM signal, from command line do ps aux | grep celeryd then find the first process ID and do kill <pid> (e.g. kill 1234)) and see what happens if you execute the code again (perhaps with different parameters e.g. tasks.add.delay(2, 3) to see the difference). Yup, result.ready() returns False. It will keep returning False until you start the worker process up again. The message has been queued and at least for this configuration, will persist until it gets processed by a worker. Start the celeryd process again ($ python manage.py celeryd) and now try result.ready() in the shell — the message was processed and a result is able to be returned.

If you’re still unsure about how cool this is, then you probably don’t need queuing :-) But for the final trick in this section until we get this thing rocking and rolling properly, lets demonstrate it really is a cluster being used to process this.

Of course by default, the cluster is up and running already (presuming you haven’t stopped the Laptop), but to be sure, just check with rabbitmqctl status. Make sure you’ve also run through these settings in this current section (Configure Celery with Django) on both Laptop and Server. Now — on Server, make sure that any running celeryd terminal is shutdown. Start the celeryd terminal on Laptop. From the Python / Django interpretive shell on Server, repeat the same steps.

Assuming you got a result successfully returned, you’ve just used the message queue cluster to have a worker on Laptop pick up and process the message from Server. The subtlety here is that the underlying RabbitMQ cluster handled passing the message between the two machines — both Server and Laptop are talking to RabbitMQ locally, it was the cluster that handled making the message available in both places.

Elapsed Time: Research, reading, implementing, testing, trouble shooting and documenting ~4 hr. Following the above steps, prob. 1hr minutes.

Re-Factor the Task and Demo to make life easier!

Note that this previous example is of course very basic and can be significantly optimised — in particular there is no connection pooling, so every new call to add.delay() is opening its own connection to the broker, hence the delay in posting the message. You can speed it up by passing a previously opened connection — see here for more information. We’ll implement this below.

Additionally, the Task Decorator, while useful for basic tasks, masks a lot of what’s really happening — I prefer a more verbose approach when I’m learning. So the new fetcher/tasks.py — note we’ve changed the name of the class to MyTest. Note that it MUST have a “run” method which is what will be called by apply_sync.

from celery.task import Task
class MyTest(Task):
def run(self, x, y):
logger.info(‘Received %s and %s’ % (x, y))
return int(x) + int(y)

We’ll also make our life a bit easier by adding a bin directory into the clifton project (this is where I like to store all my cron jobs etc.) and create a simple python file to call our task. This will let us just execute the file to generate the tasks — the sort of thing you’ll need to do anyway if you want to call the tasks from a view etc. I’ll call it demo.py (for clarity, in my case this is in ~/clifton/bin/demo.py). Copy the following code in (make any local adjustments needed). Note the change to use apply_async instead of delay(). It is a lower level call to do the same thing as delay(), but allows us to add arguments etc. I’ve also implemented the connection pooling.

#! /usr/bin/env python
from __future__ import with_statement
import os, sys
#os.environ[‘PYTHONPATH’] = “/home/tim/projects” # Uncomment and point this to your root django project
# directory if PYTHONPATH not set properly already.
os.environ[‘DJANGO_SETTINGS_MODULE’]=’clifton.settings’
from clifton.fetcher.tasks import MyTest
from celery.messaging import establish_connection
if __name__ == “__main__”:
numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
results = []
with establish_connection() as connection:
for args in numbers:
res = MyTest.apply_async(args=args, connection=connection)
results.append(res)
print([res.get() for res in results])

If you run this from the bin directory, you should see the following result:

$ python demo.py
[4, 8, 16, 32]

We now have a better structure to take our experiments forwards and further optimise.

Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~2 hr. Following the above steps, prob. 30 minutes.

Directing tasks to different servers

From here on in, this will become a little less generic and begin to deal with the problems that we are having and trying to resolve. I’ll explain why we’ve made our decisions and you can make your own choices. While you can make each of these settings in settings.py on the two different machines, I suggest you create a local_settings.py and add them in there (especially if you’re using SVN to sync changes — ie. running the same Django project on both servers). Where I say settings.py below I’ve actually created these changes in a local_settings.py. Note that most of these are default settings and can generally be over-ridden at different levels.

  • Run tasks on different machines. This actual design of these queues, exchanges and bindings is a reasonably complicated topic that gets very design specific and frankly confusing! It’s also (somewhat) abstracted by Celery which implements only Direct and Topic exchanges for example. It’s worth reading this post on Rabbits and Warrens which gives some good background on the various options provided by AMQP. If all you want to do is post a message and pick it up with a worker, then celery handles that for you, but in our case, we’d like to make some processing choices up front and be able to use different machines to process different messages. There is a good guide to this in the FAQ for Celery. Add the following to Server in settings.py
CELERY_DEFAULT_QUEUE = “server”
CELERY_QUEUES = {
“server”: {
“binding_key”: “server_task”,
},
}
CELERY_DEFAULT_EXCHANGE = “tasks”
CELERY_DEFAULT_EXCHANGE_TYPE = “direct”
CELERY_DEFAULT_ROUTING_KEY = “server_task”

We’ve just told celeryd that when it starts it should bind to a queue called “server” and listen for messages routed to server_task. Do the same on Laptop, but specify laptop instead.

CELERY_DEFAULT_QUEUE = “laptop”
CELERY_QUEUES = {
“laptop”: {
“binding_key”: “laptop_task”,
},
}
CELERY_DEFAULT_EXCHANGE = “tasks”
CELERY_DEFAULT_EXCHANGE_TYPE = “direct”
CELERY_DEFAULT_ROUTING_KEY = “laptop_task”

There are several different ways of implementing this (telling tasks where to execute). For example, in this configuration, if we do nothing and simply run the demo.py we created earlier, tasks started on Server will execute on Server and vice versa, because the defaults will be applied (which are different on each machine). Most likely we want to make some decision at execution time about where we want to route this task. In this case, simplify modify line 19 of the demo.py to add the routing_key parameter as follows:

res = MyTest.apply_async(args=args, connection=connection, routing_key=”laptop_task”)

Changing it to routing_key = ‘server_task’ will force the task to execute on Server (no matter where it originated, although strictly speaking it forces it to execute on the worker bound to the server queue, listening for server_task — we just happen to configure this queue on Server) and vice versa. Test this out and make sure its all working, you might like to add some logging into the tasks.py so you can check and see what is executing where (next section shows how if you need help with this). NB — It looks like the task files are cached in the celeryd worker, so if you modify the task, it looks like a good idea to restart the worker, at least for the moment when testing.

As a final note, lets say you want the Server to ALSO be able to pick up Laptop tasks (to use its spare capacity — or just as a more “realistic” example). Simply make the following change to Server settings.py CELERY_QUEUES and now Server is listening for Laptop tasks too. Because both Laptop and Server are bound to the same queue (which happens to be called laptop in our example), Rabbit automatically round robins messages between the two. Change the CELERY_QUEUES setting as follows to take advantage of this on Server only.

CELERY_QUEUES = {
“server”: {
“binding_key”: “server_task”,
“laptop”: {
“binding_key”: “laptop_task”,
},

If you extend the demo.py so it passes a lot more numbers, you should be able to use demo.py to send messages to server_task and laptop_task and see the difference by checking the logs. Both Server and Laptop process tasks for laptop_task, while only Server processes tasks for server_task.

numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8)
(9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14),
(15, 15), (16, 16), (17, 17), (18, 18)]

Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~4.5 hr. Following the above steps, prob. 2 hrs max.

Get a (heart) beat

We now have a (basic) distributed architecture with the ability to route jobs to a specific server and we also know how to distribute across two servers (or more). The next trick is to create an automated task that runs on a regular basis. Now we could use a CRON job or something similar, but it would be nice if there was a way of building this into Django and Celery so we can route the messages straight on to a queue. It turns out that there is using a periodic_task which does more or less what it says on the box. It runs periodically. I modified our earlier tasks.py as follows below. Now as well as the basic add task already defined, we now have a PeriodicTask which executes every 30 seconds. It doesn’t do much — I just simply had it call add again in this example.

from celery.task import PeriodicTask, Task
from celery.messaging import establish_connection
import logging
logger = logging.getLogger(‘fetcher.tasks’)
class MyTest(Task):
def run(self, x, y):
logger.info(‘Received %s and %s’ % (x, y))
return int(x) + int(y)
class MyPeriodicTask(PeriodicTask):
run_every = timedelta(seconds=30)
def run(self, **kwargs):
numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18)]
with establish_connection() as connection:
for n in numbers:
MyTest.apply_async(args=[n[0], n[1]], connection=connection, routing_key=”laptop_task”)

logger.info(‘Ran periodic task’)

You might need to remove the logging options unless you have a proper logger setup in your settings.py (FYI, you can add the following into settings.py and this should all work nicely for you — just change paths as appropriate and also make sure you have a log directory).

LOG_FILENAME= ‘/home/tim/clifton/log/debug.log’
import logging
logging.basicConfig(
filename=LOG_FILENAME,
format=’[%(levelname)-5s] %(asctime)-8s %(name)-10s %(message)s’,
#datefmt=’%a, %d %b %Y %H:%M:%S’,
datefmt=’%H:%M:%S’,
level=logging.DEBUG)

But how does it execute? Well in this instance we don’t actually need a demo.py, instead we can use the beat feature of the celeryd to execute the periodic task for us. To do this, simply start your celeryd with the -B option. e.g.

$ python manage.py celeryd -B

Alternatively you can run a dedicated celerybeat server.

$ python manage.py celerybeat

Just be aware that if you run a dedicated celerybeat server, you’ll also need to start a worker (celeryd) yourself, otherwise you’ll have your tasks sent to the queue, but not processed. I have a personal preference for running the dedicated celerybeat server as it makes it easier to isolate and shutdown just the beat server process from the workers. (ps aux | grep celerybeat)

Elapsed Time: Implementing, limited trouble shooting and documenting ~1 hr. Following the above steps, prob. 30 minutes.

Set Static Execution Limits

I suspect for many people this is now approaching the point where you have enough of a grip to be able to use this in anger in a lot of different situations. We have one additional problem however that we need to resolve. In some instances we need to limit the amount of messages that a queue processes. This is because while we might have the physical capacity to process 500,000 calls an hour, we could very well have other service API limits imposed on us by the services we are calling.

In fact this is exactly the situation when we are talking to the Twitter API. Twitter allocates us a limit and we can’t exceed this. To make it more complicated, this limit can actually differ by server. We need to be able to limit execution, by server, to not exceed our API limit.

Actually there is a simple enough partial solution for this. Celery defines a rate_limit which controls the amount a task can execute in a given time period. To use it, pass it in to the @task() decorator, or specify it at the top of the class definition. e.g.

@task(rate_limit=”100/m”)
def add(x, y):
logger.info(‘Adding %s and %s together’ % (x, y))
return x + y

class MyTest(Task):
rate_limit=”100/m”

def run(self, x, y):
logger.info(‘Received %s and %s’ % (x, y))
return int(x) + int(y)

class MyPeriodicTask(PeriodicTask):
run_every = timedelta(seconds=30)
rate_limit=”1000/h”

def run(self, **kwargs):
r = add(1, 2)
logger.info(“Running periodic task! Result of add was %s” % r)


The rate_limit is a combination of “how many times” and a per hour (h), second(s) or minutes(m) value. e.g. “1000/h” is executed no more than 1000 times in an hour.

Because each worker executes on a different server, and therefore gets different settings from the settings.py (or the local_settings.py) file, we can use this to change execution by server if we use a full class definition (and not the decorator) by adding ou