Part 2: Build a processing queue with multi-threading and spread over multiple servers in less than a day using RabbitMQ and Celery.

This is part two of my post on How to build a multi-threading processing queue.

Set Static Execution Limits

I suspect for many people this is now approaching the point where you have enough of a grip to be able to use this in anger in a lot of different situations. We have one additional problem however that we need to resolve. In some instances we need to limit the amount of messages that a queue processes. This is because while we might have the physical capacity to process 500,000 calls an hour, we could very well have other service API limits imposed on us by the services we are calling.

In fact this is exactly the situation when we are talking to the Twitter API. Twitter allocates us a limit and we can’t exceed this. To make it more complicated, this limit can actually differ by server. We need to be able to limit execution, by server, to not exceed our API limit.

Actually there is a simple enough partial solution for this. Celery defines a rate_limit which controls the amount a task can execute in a given time period. To use it, pass it in to the @task() decorator, or specify it at the top of the class definition. e.g.

@task(rate_limit=”100/m”)
def add(x, y):
logger.info(‘Adding %s and %s together’ % (x, y))
return x + y

class MyTest(Task):
rate_limit=”100/m”

def run(self, x, y):
logger.info(‘Received %s and %s’ % (x, y))
return int(x) + int(y)

class MyPeriodicTask(PeriodicTask):
run_every = timedelta(seconds=30)
rate_limit=”1000/h”

def run(self, **kwargs):
r = add(1, 2)
logger.info(“Running periodic task! Result of add was %s” % r)


The rate_limit is a combination of “how many times” and a per hour (h), second(s) or minutes(m) value. e.g. “1000/h” is executed no more than 1000 times in an hour.

Because each worker executes on a different server, and therefore gets different settings from the settings.py (or the local_settings.py) file, we can use this to change execution by server if we use a full class definition (and not the decorator) by adding our own custom parameter to the settings.py e.g. for the periodic class:

from clifton.settings import SERVER_TWITTER_LIMIT
class MyPeriodicTask(PeriodicTask):
rate_limit = SERVER_TWITTER_LIMIT

Now when I execute the task on different servers, they get different limits. Lets say I have two different limits — 20,000 on one machine and 100,000 on the other. Using this approach, the first 40,000 will be (more or less) evenly distributed between the two servers, but once the 20,000 machine hits its limit, the worker will stop accepting tasks meaning that the remaining ~60,000 tasks go to the machine with the remaining limit. Of course you could specify this exactly, but we want to leave it to configuration and use the same code across both (or more) servers.

Elapsed Time: Implementing, limited trouble shooting and documenting ~2 hr. Following the above steps, prob. 30 minutes.

Failed tasks

I mentioned above that the solution is really only a partial one. Why? Using Twitter as an example, depending on your tasks one task may make 1, 2 or more calls to Twitter (thus using API limit). You might also have different tasks with different numbers of calls executing on the same machines and so forth. It gets complicated very quickly. The very specific problem we have is that different servers actually have different rate limits — so one server can process 20,000 calls while the other can processes 100,000.

After a lot of research to get this far I’ve realised that there isn’t a very elegant solution (within the limits of Celery). A lot of this has been learning what I CAN’T do with Celery and AMQP which has taken some time. For example, the immediately “obvious” solution to me (turn off one of the workers when the server they are on runs out of limit) turns out to be very difficult for a range of reasons (one of which is that there isn’t a way to tell a worker to shutdown — there is a way of stopping all the workers on a server, but not the workers bound to a specific queue).

Another possibility, making the scheduler aware of the limits on each server and using routing keys to direct to each also wasn’t very appealing because it requires the task scheduler to know about the queues, which isn’t really the point — state information has to be transferred back and forward, the scheduler (periodic task in this case) kicking off the tasks also needs to know which queues are available and where they are. Messy.

So a compromise is needed.

My compromise is going to be this. I will reduce the granularity of my tasks such that there is only one Twitter call per message. This means that I can now match my queue limits to the API limit on the server, so that the workers on one server only get 100,000 messages per hour, while the workers on the other server get 20,000. (You might wonder about why this works — it works because although all the workers are bound to the same RabbitMQ queue, it’s actually the celeryd process that is implementing the polling of the queues — so it’s really what’s enforcing the limit. If you remember in the early section on Directing Tasks to Server, we showed how to have different settings on each server. It’s not a function of AMQP or RabbitMQ.)

Having reduced the granularity of the tasks I now have one relatively simple problem to solve:

  1. I have to deal with tasks that will still fail because of the API limit, despite my best intentions and the queue limits in place.

Fortunately Celery has a method for the Task class which allows you to submit a retry as in the following example.

class MyTest(Task):

def run(self, x, y):
logger.info(‘Received %s and %s’ % (x, y))
try:
i = int(x) + int(y)
except:
self.retry(args=)

That’s not a very good example (if only because if it fails, it will always fail — resubmitting the same non-integer values won’t help!). It does illustrate how to do it though.

Elapsed Time: Implementing, LOTS of research into some things I couldn’t do, limited trouble shooting and documenting ~4 hr. Following the above steps, prob. 15 minutes.

In Summary

There is of course still a lot to do. You need to think about your own application architecture, what tasks and queues you need, how you’ll split them up, where the processing will occur, etc. You might like to think about a memory only implementation, or any number of other considerations.

Still, hopefully this has gone some way to helping you design your first RabbitMQ / Celery implementation in a near real world way. For well under 8 hours effort, it’s a day that’s worthwhile if you’ve got this kind of processing problem and want to use queuing to allow you to distribute your processing load easily.

I’d love to get your comments and feedback on this and welcome any corrections (I’ve refactored this a few times as I went through the documentation and learnt things later on, so I haven’t completely reproduced every step or code example any more, although it’s all working in my environment).

Thanks to Ask Solem Hoel for the awesome Celery and Carrot libraries and the active support on the forums, I’m really excited about turning this on in production for our application (http://tribalytic.com and it’s baby brother http://twendly.com).