As we move through the development cycle we now have many of the essential processing modules we need for Tribalytic, but we also have a few challenges we need to deal with as well:
There are lots of different ways that this could be done, but after some preliminary research, I'm going to set out to do it using two key pieces of technology:
The goal of this document is for me to both document what I've learnt so I can replicate in our production environment and lay out the "easy" steps after having digested the various documentation for you to able to implement something that solves some common problems quickly.
There are all sort of reasons that I've preselected these technologies which aren't the point of this post – this post is documenting the challenge of having selected this approach, how easy is it to implement? Firstly a few notes about my setup:
Finally, there are a lot of great resources out there, but the few I relied on the most (and where some of the content here is adapted from) are:
first .
At this point the server should actually be running (this confused me first time, I think the Rabbit Controller starts the server to add the hosts etc.). You can check if there is a server running as follows:
without any problems at all. NB you could skip configuring security etc. on Laptop if you like as it will be reset in the next step, I think it's worth it anyway just as practice.
NB – If you see a dump like the following when starting the server then in my case this was because the server was already started – use the
Knowing I want this to work across at least two servers from day one, I decided to next cluster the RabbitMQ servers.
and DON'T miss the step I did on configuring the Erlang cookie. I had a couple of minor issues that I needed to read up on, my Linux knowledge being very sketchy which slowed me up. Here are the steps I ended up following (you could use the guide linked, but I've just added in a couple of things relevant to what we are doing here).
Note – if you have trouble with the reset (because like me you tried to actually cluster the machine BEFORE the cookie was set in Erlang) you can try
sudo rabbitmqctl force_reset which should sort it out.
- You now have a clustered RabbitMQ setup. You can verify this by typing sudo rabbitmqctl status on either server and you'll see it list something like the following:
[{running_applications,[{rabbit,"RabbitMQ","1.7.2"},
{mnesia,"MNESIA CXC 138 12","4.4.10"},
{os_mon,"CPO CXC 138 46","2.2.2"},
{sasl,"SASL CXC 138 11","2.1.6"},
{stdlib,"ERTS CXC 138 10","1.16.2"},
{kernel,"ERTS CXC 138 10","2.13.2"}]},
{nodes,['rabbit@laptop','rabbit@server']},
{running_nodes,['rabbit@laptop','rabbit@server']}]
This is neat, but if I read the documentation correctly what we have here is a RAM / RAM cluster. If one of the servers goes down, we will be fine because the message state is replicated across clusters, but if the whole lot went out (because the data centre lost power, or more likely in my situation that I just turned both PCs off over night) we might really want a persistent DISK node.
To convert
Server to being a disk node, simply execute the following on
Server (while the cluster is running).
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl cluster rabbit@laptop rabbit@server
Or (FYI) to turn it back into a RAM node. Note it doesn't matter for our purposes in this doco how they are configured, read up and decide what you need.
$ sudo rabbitmqctl cluster rabbit@laptop
Finally, start it up again
$ sudo rabbitmqctl start_app
To remove a server from a cluster at any time, simply do this (this is not a required step, it's just "FYI").
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl start_app
NB – You'll need to use force_reset for the LAST node in the cluster to be removed (if you're separating them all out again). REMEMBER if you do a full cluster reset (like I did in testing this), you'll need to redo the security section from the first section again
This is because you've reset both nodes so they no longer hold the security information. If you ever get into problems with the configuration, I've resolved them consistently by de-clustering, forcing a node reset on both nodes and then clustering and redoing the security.
Elapsed Time: Research, reading, implementing and trouble shooting cluster ~2 hr. Following the above steps, prob. 30 minutes.
Configure Celery with Django
OK, now we are ready to get
Celery setup with Django. Create a new django project. Mines called "clifton" (we use local train stations as our milestone names) and then within that I have an app called fetcher. All of these steps need to be done on both
Server and
Laptop (I'm using SVN so I simply make the change on
Server, commit and then update from
Laptop).
- In the INSTALLED_APPS section of settings.py add celery as follows.
INSTALLED_APPS = (
'django.contrib.sessions',
'django.contrib.sites',
'celery',
'clifton.fetcher',
)
- Sync the DB as celery adds a couple of task tracking tables.
$ python manage.py syncdb
- Add the following settings into settings.py as well. These are just to get us going, we'll come back to this and improve them soon once we've got this section working.
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "mypassword"
BROKER_VHOST = "myvhost"
- Finally, in your application directory (e.g. fetcher in my example) create a tasks.py module with the following code. This is literally just for a test to show it's working. Note it MUST be called tasks.py. It uses the decorator to wrap a class definition around a simple add function that adds two numbers and returns a result. Note that the decorator causes the class to have the name of the function (e.g. we effectively end up with a class called add in the tasks.py).
from celery.decorators import task
@task()
def add(x, y):
return x + y
Having completed these steps on Server and Laptop, we now need to actually test and run our AMQP workers. Just on Server for now, do the following steps.
- Start a terminal window from the clifton directory (or wherever you put your Django app) start a celery daemon using Django. Again, there are more options here, but this is just a starting point to show it all working.
$ python manage.py celeryd
- Now open a second terminal window and execute the following from the clifton directory. We use the Django interpretive shell to be sure all the settings are loaded nicely for us.
$ python manage.py shell
>> from clifton.fetcher import tasks
>> result = tasks.add.delay(4,4)
>> result.ready()
True
>> result.result
8
So what just happened? Well, Celery wrapped up the add function (which simply adds x and y and returns them) in a class through the decorator. We then, through the Django interpreter shell, imported the class, called the delay method, passing the parameters 4 and 4. Celery then handled all the work of pushing that out onto the RabbitMQ server, then the celeryd worker actually got the values, executed them and provided the results. The result.ready() command told us that our results for this particular instance had been processed and then result.result returned what that result actually was. If result.ready() is False, you may find that the AMQP server is running but you don't have the worker process running.
We've just passed our first message. There is a LOT going on under the hood here, but this example is fairly uninspiring. Shut down the celeryd terminal (send it a TERM signal, from command line do
ps aux | grep celeryd then find the first process ID and do
kill <pid> (e.g.
kill 1234)) and see what happens if you execute the code again (perhaps with different parameters e.g.
tasks.add.delay(2, 3) to see the difference). Yup, result.ready() returns False. It will keep returning False until you start the worker process up again. The message has been queued and at least for this configuration, will persist until it gets processed by a worker. Start the celeryd process again (
$ python manage.py celeryd) and now try
result.ready() in the shell – the message was processed and a result is able to be returned.
If you're still unsure about how cool this is, then you probably don't need queuing

But for the final trick in this section until we get this thing rocking and rolling properly, lets demonstrate it really is a cluster being used to process this.
Of course by default, the cluster is up and running already (presuming you haven't stopped the
Laptop), but to be sure, just check with
rabbitmqctl status. Make sure you've also run through these settings in this current section (Configure Celery with Django) on both
Laptop and
Server. Now – on
Server, make sure that any running
celeryd terminal is shutdown. Start the
celeryd terminal on
Laptop. From the Python / Django interpretive shell on
Server, repeat the same steps.
Assuming you got a result successfully returned, you've just used the message queue cluster to have a worker on
Laptop pick up and process the message from
Server. The subtlety here is that the underlying RabbitMQ cluster handled passing the message between the two machines – both
Server and
Laptop are talking to RabbitMQ locally, it was the cluster that handled making the message available in both places.
Elapsed Time: Research, reading, implementing, testing, trouble shooting and documenting ~4 hr. Following the above steps, prob. 1hr minutes.
Re-Factor the Task and Demo to make life easier!
Note that this previous example is of course very basic and can be significantly optimised – in particular there is no connection pooling, so every new call to add.delay() is opening its own connection to the broker, hence the delay in posting the message. You can speed it up by passing a previously opened connection –
see here for more information. We'll implement this below.
Additionally, the Task Decorator, while useful for basic tasks, masks a lot of what's really happening – I prefer a more verbose approach when I'm learning. So the new fetcher/tasks.py – note we've changed the name of the class to MyTest. Note that it MUST have a "run" method which is what will be called by apply_sync.
from celery.task import Task
class MyTest(Task):
def run(self, x, y):
logger.info('Received %s and %s' % (x, y))
return int(x) + int(y)
We'll also make our life a bit easier by adding a bin directory into the clifton project (this is where I like to store all my cron jobs etc.) and create a simple python file to call our task. This will let us just execute the file to generate the tasks – the sort of thing you'll need to do anyway if you want to call the tasks from a view etc. I'll call it demo.py (for clarity, in my case this is in ~/clifton/bin/demo.py). Copy the following code in (make any local adjustments needed). Note the change to use apply_async instead of delay(). It is a lower level call to do the same thing as delay(), but allows us to add arguments etc. I've also implemented the connection pooling.
#! /usr/bin/env python
from __future__ import with_statement
import os, sys
#os.environ['PYTHONPATH'] = "/home/tim/projects" # Uncomment and point this to your root django project
# directory if PYTHONPATH not set properly already.
os.environ['DJANGO_SETTINGS_MODULE']='clifton.settings'
from clifton.fetcher.tasks import MyTest
from celery.messaging import establish_connection
if __name__ == "__main__":
numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
results = []
with establish_connection() as connection:
for args in numbers:
res = MyTest.apply_async(args=args, connection=connection)
results.append(res)
print([res.get() for res in results])
If you run this from the bin directory, you should see the following result:
$ python demo.py
[4, 8, 16, 32]
We now have a better structure to take our experiments forwards and further optimise.
Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~2 hr. Following the above steps, prob. 30 minutes.
Directing tasks to different servers
From here on in, this will become a little less generic and begin to deal with the problems that we are having and trying to resolve. I'll explain why we've made our decisions and you can make your own choices. While you can make each of these settings in settings.py on the two different machines, I suggest you create a local_settings.py and add them in there (especially if you're using SVN to sync changes – ie. running the same Django project on both servers). Where I say settings.py below I've actually created these changes in a local_settings.py. Note that most of these are default settings and can generally be over-ridden at different levels.
- Run tasks on different machines. This actual design of these queues, exchanges and bindings is a reasonably complicated topic that gets very design specific and frankly confusing! It's also (somewhat) abstracted by Celery which implements only Direct and Topic exchanges for example. It's worth reading this post on Rabbits and Warrens which gives some good background on the various options provided by AMQP. If all you want to do is post a message and pick it up with a worker, then celery handles that for you, but in our case, we'd like to make some processing choices up front and be able to use different machines to process different messages. There is a good guide to this in the FAQ for Celery. Add the following to Server in settings.py
CELERY_DEFAULT_QUEUE = "server"
CELERY_QUEUES = {
"server": {
"binding_key": "server_task",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "server_task"
We've just told celeryd that when it starts it should bind to a queue called "server" and listen for messages routed to server_task. Do the same on Laptop, but specify laptop instead.
CELERY_DEFAULT_QUEUE = "laptop"
CELERY_QUEUES = {
"laptop": {
"binding_key": "laptop_task",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "laptop_task"
There are several different ways of implementing this (telling tasks where to execute). For example, in this configuration, if we do nothing and simply run the demo.py we created earlier, tasks started on Server will execute on Server and vice versa, because the defaults will be applied (which are different on each machine). Most likely we want to make some decision at execution time about where we want to route this task. In this case, simplify modify line 19 of the demo.py to add the routing_key parameter as follows:
res = MyTest.apply_async(args=args, connection=connection, routing_key="laptop_task")
Changing it to routing_key = 'server_task' will force the task to execute on Server (no matter where it originated, although strictly speaking it forces it to execute on the worker bound to the server queue, listening for server_task – we just happen to configure this queue on Server) and vice versa. Test this out and make sure its all working, you might like to add some logging into the tasks.py so you can check and see what is executing where (next section shows how if you need help with this). NB – It looks like the task files are cached in the celeryd worker, so if you modify the task, it looks like a good idea to restart the worker, at least for the moment when testing.
As a final note, lets say you want the
Server to ALSO be able to pick up
Laptop tasks (to use its spare capacity – or just as a more "realistic" example). Simply make the following change to
Server settings.py CELERY_QUEUES and now
Server is listening for
Laptop tasks too. Because both
Laptop and
Server are bound to the same queue (which happens to be called
laptop in our example), Rabbit automatically round robins messages between the two. Change the CELERY_QUEUES setting as follows to take advantage of this on
Server only.
CELERY_QUEUES = {
"server": {
"binding_key": "server_task",
"laptop": {
"binding_key": "laptop_task",
},
If you extend the demo.py so it passes a lot more numbers, you should be able to use demo.py to send messages to server_task and laptop_task and see the difference by checking the logs. Both Server and Laptop process tasks for laptop_task, while only Server processes tasks for server_task.
numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8)
(9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14),
(15, 15), (16, 16), (17, 17), (18, 18)]
Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~4.5 hr. Following the above steps, prob. 2 hrs max.
Get a (heart) beat
We now have a (basic) distributed architecture with the ability to route jobs to a specific server and we also know how to distribute across two servers (or more). The next trick is to create an automated task that runs on a regular basis. Now we could use a CRON job or something similar, but it would be nice if there was a way of building this into Django and Celery so we can route the messages straight on to a queue. It turns out that there is using a periodic_task which does more or less what it says
on the box. It runs periodically. I modified our earlier
tasks.py as follows below. Now as well as the basic
add task already defined, we now have a PeriodicTask which executes every 30 seconds. It doesn't do much – I just simply had it call add again in this example.
from celery.task import PeriodicTask, Task
from celery.messaging import establish_connection
import logging
logger = logging.getLogger('fetcher.tasks')
class MyTest(Task):
def run(self, x, y):
logger.info('Received %s and %s' % (x, y))
return int(x) + int(y)
class MyPeriodicTask(PeriodicTask):
run_every = timedelta(seconds=30)
def run(self, **kwargs):
numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18)]
with establish_connection() as connection:
for n in numbers:
MyTest.apply_async(args=[n[0], n[1]], connection=connection, routing_key="laptop_task")
logger.info('Ran periodic task')
You might need to remove the logging options unless you have a proper logger setup in your settings.py (FYI, you can add the following into settings.py and this should all work nicely for you – just change paths as appropriate and also make sure you have a log directory).
LOG_FILENAME= '/home/tim/clifton/log/debug.log'
import logging
logging.basicConfig(
filename=LOG_FILENAME,
format='[%(levelname)-5s] %(asctime)-8s %(name)-10s %(message)s',
#datefmt='%a, %d %b %Y %H:%M:%S',
datefmt='%H:%M:%S',
level=logging.DEBUG)
But how does it execute? Well in this instance we don't actually need a demo.py, instead we can use the beat feature of the celeryd to execute the periodic task for us. To do this, simply start your celeryd with the -B option. e.g.
$ python manage.py celeryd -B
Alternatively you can run a dedicated celerybeat server.
$ python manage.py celerybeat
Just be aware that if you run a dedicated celerybeat server, you'll also need to start a worker (celeryd) yourself, otherwise you'll have your tasks sent to the queue, but not processed. I have a personal preference for running the dedicated celerybeat server as it makes it easier to isolate and shutdown just the beat server process from the workers. (ps aux | grep celerybeat)
Elapsed Time: Implementing, limited trouble shooting and documenting ~1 hr. Following the above steps, prob. 30 minutes.
Set Static Execution Limits
I suspect for many people this is now approaching the point where you have enough of a grip to be able to use this in anger in a lot of different situations. We have one additional problem however that we need to resolve. In some instances we need to limit the amount of messages that a queue processes. This is because while we might have the physical capacity to process 500,000 calls an hour, we could very well have other service API limits imposed on us by the services we are calling.
In fact this is exactly the situation when we are talking to the Twitter API. Twitter allocates us a limit and we can't exceed this. To make it more complicated, this limit can actually differ by server. We need to be able to limit execution, by server, to not exceed our API limit.
Actually there is a simple enough
partial solution for this. Celery defines a
rate_limit which controls the amount a task can execute in a given time period. To use it, pass it in to the @task() decorator, or specify it at the top of the class definition. e.g.
@task(rate_limit="100/m") def add(x, y): logger.info('Adding %s and %s together' % (x, y)) return x + y
class MyTest(Task):
rate_limit="100/m"
def run(self, x, y):
logger.info('Received %s and %s' % (x, y))
return int(x) + int(y)
class MyPeriodicTask(PeriodicTask): run_every = timedelta(seconds=30)
rate_limit="1000/h"
def run(self, **kwargs): r = add(1, 2) logger.info("Running periodic task! Result of add was %s" % r)
The rate_limit is a combination of "how many times" and a per hour (h), second(s) or minutes(m) value. e.g. "1000/h" is executed no more than 1000 times in an hour.
Because each worker executes on a different server, and therefore gets different settings from the settings.py (or the local_settings.py) file, we can use this to change execution by server if we use a full class definition (and not the decorator) by adding ou