Posterous
Tim is using Posterous to post everything online. Shouldn't you?
P1030336_thumb
 

Tim Bull

The often random thoughts of an Enterprise Technologist, Coffee Addict, Social Media Junkie and Co-Founder of BinaryPlex

Part 2: Build a processing queue with multi-threading and spread over multiple servers in less than a day using RabbitMQ and Celery.

This is part two of my post on How to build a multi-threading processing queue.

Set Static Execution Limits

I suspect for many people this is now approaching the point where you have enough of a grip to be able to use this in anger in a lot of different situations.  We have one additional problem however that we need to resolve.  In some instances we need to limit the amount of messages that a queue processes.  This is because while we might have the physical capacity to process 500,000 calls an hour, we could very well have other service API limits imposed on us by the services we are calling. 

In fact this is exactly the situation when we are talking to the Twitter API.  Twitter allocates us a limit and we can't exceed this.  To make it more complicated, this limit can actually differ by server.  We need to be able to limit execution, by server, to not exceed our API limit.

Actually there is a simple enough partial solution for this.  Celery defines a rate_limit which controls the amount a task can execute in a given time period.  To use it, pass it in to the @task() decorator, or specify it at the top of the class definition. e.g.

@task(rate_limit="100/m")
def add(x, y):
    logger.info('Adding %s and %s together' % (x, y))
    return x + y

class MyTest(Task):
    rate_limit="100/m"

    def run(self, x, y):
    logger.info('Received %s and %s' % (x, y))
        return int(x) + int(y)

class MyPeriodicTask(PeriodicTask):
    run_every = timedelta(seconds=30)
    rate_limit="1000/h"

    def run(self, **kwargs):
        r = add(1, 2)
        logger.info("Running periodic task! Result of add was %s" % r)


The rate_limit is a combination of "how many times" and a per hour (h), second(s) or minutes(m) value. e.g. "1000/h" is executed no more than 1000 times in an hour.

Because each worker executes on a different server, and therefore gets different settings from the settings.py (or the local_settings.py) file, we can use this to change execution by server if we use a full class definition (and not the decorator) by adding our own custom parameter to the settings.py  e.g. for the periodic class:

from clifton.settings import SERVER_TWITTER_LIMIT

class MyPeriodicTask(PeriodicTask):
    rate_limit = SERVER_TWITTER_LIMIT


Now when I execute the task on different servers, they get different limits.  Lets say I have two different limits - 20,000 on one machine and 100,000 on the other.  Using this approach, the first 40,000 will be (more or less) evenly distributed between the two servers, but once the 20,000 machine hits its limit, the worker will stop accepting tasks meaning that the remaining ~60,000 tasks go to the machine with the remaining limit.  Of course you could specify this exactly, but we want to leave it to configuration and use the same code across both (or more) servers.

Elapsed Time: Implementing, limited trouble shooting and documenting ~2 hr.  Following the above steps, prob. 30 minutes.

Failed tasks

I mentioned above that the solution is really only a partial one.  Why? Using Twitter as an example, depending on your tasks one task may make 1, 2 or more calls to Twitter (thus using API limit).  You might also have different tasks with different numbers of calls executing on the same machines and so forth.  It gets complicated very quickly.  The very specific problem we have is that different servers actually have different rate limits - so one server can process 20,000 calls while the other can processes 100,000.

After a lot of research to get this far I've realised that there isn't a very elegant solution (within the limits of Celery).  A lot of this has been learning what I CAN'T do with Celery and AMQP which has taken some time.  For example, the immediately "obvious" solution to me (turn off one of the workers when the server they are on runs out of limit) turns out to be very difficult for a range of reasons (one of which is that there isn't a way to tell a worker to shutdown - there is a way of stopping all the workers on a server, but not the workers bound to a specific queue).

Another possibility, making the scheduler aware of the limits on each server and using routing keys to direct to each also wasn't very appealing because it requires the task scheduler to know about the queues, which isn't really the point - state information has to be transferred back and forward, the scheduler (periodic task in this case) kicking off the tasks also needs to know which queues are available and where they are.  Messy.

So a compromise is needed.

My compromise is going to be this.  I will reduce the granularity of my tasks such that there is only one Twitter call per message.  This means that I can now match my queue limits to the API limit on the server, so that the workers on one server only get 100,000 messages per hour, while the workers on the other server get 20,000. (You might wonder about why this works - it works because although all the workers are bound to the same RabbitMQ queue, it's actually the celeryd process that is implementing the polling of the queues - so it's really what's enforcing the limit.  If you remember in the early section on Directing Tasks to Server, we showed how to have different settings on each server. It's not a function of AMQP or RabbitMQ.)

Having reduced the granularity of the tasks I now have one relatively simple problem to solve:

  1. I have to deal with tasks that will still fail because of the API limit, despite my best intentions and the queue limits in place.

Fortunately Celery has a method for the Task class which allows you to submit a retry as in the following example.

class MyTest(Task):

    def run(self, x, y):

        logger.info('Received %s and %s' % (x, y))
        try:
            i = int(x) + int(y)
        except:
            self.retry(args=)
 
That's not a very good example (if only because if it fails, it will always fail - resubmitting the same non-integer values won't help!).  It does illustrate how to do it though.

Elapsed Time: Implementing, LOTS of research into some things I couldn't do, limited trouble shooting and documenting ~4 hr.  Following the above steps, prob. 15 minutes.

In Summary

There is of course still a lot to do.  You need to think about your own application architecture, what tasks and queues you need, how you'll split them up, where the processing will occur, etc.  You might like to think about a memory only implementation, or any number of other considerations.

Still, hopefully this has gone some way to helping you design your first RabbitMQ / Celery implementation in a near real world way.  For well under 8 hours effort, it's a day that's worthwhile if you've got this kind of processing problem and want to use queuing to allow you to distribute your processing load easily.

I'd love to get your comments and feedback on this and welcome any corrections (I've refactored this a few times as I went through the documentation and learnt things later on, so I haven't completely reproduced every step or code example any more, although it's all working in my environment).

Thanks to Ask Solem Hoel for the awesome Celery and Carrot libraries and the active support on the forums, I'm really excited about turning this on in production for our application (http://tribalytic.com and it's baby brother http://twendly.com).

Loading mentions Retweet
Posted March 8, 2010
// 0 Comments

Build a processing queue with multi-threading and spread over multiple servers in less than a day hours using RabbitMQ and Celery.

As we move through the development cycle we now have many of the essential processing modules we need for Tribalytic, but we also have a few challenges we need to deal with as well:

  1. We need to collect data faster than we can process it on a single processor.  Requirement - we need to be able to collect the data and then spread it out for processing over multiple processors / servers.
  2. Some servers have a quota on how much they can process per hour for certain things (related to API limits set by Twitter etc.) Requirement - servers need to know their available limits and not take on more than they can process.
  3. We need to be able to schedule some tasks on a regular basis.
There are lots of different ways that this could be done, but after some preliminary research, I'm going to set out to do it using two key pieces of technology:
  1. RabbitMQ - An AMQP (Advanced Message Queuing Protocol) server.  Robust message queues help a LOT with point one in particular.
  2. Celery - A pythonic / Django friendly task scheduler and queuing interface to Rabbit / AMQP. Celery provides the magic for points two and three.
The goal of this document is for me to both document what I've learnt so I can replicate in our production environment and lay out the "easy" steps after having digested the various documentation for you to able to implement something that solves some common problems quickly.

There are all sort of reasons that I've preselected these technologies which aren't the point of this post - this post is documenting the challenge of having selected this approach, how easy is it to implement?  Firstly a few notes about my setup:

  1. I have two machines I'll be configuring this on, both running Ubuntu 9.10 Karmic Koala release.  I'll call them Server and Laptop.  For the record, Server is a PC with 4 cores and 6Gb of RAM.  Laptop is an MSI laptop with a single core (1.3Ghz) and 2Gb RAM.  When you see Server and Laptop, replace them with your own machine names.
  2. Python and Django are already pre-configured and working.
Finally, there are a lot of great resources out there, but the few I relied on the most (and where some of the content here is adapted from) are:

  1. http://www.rabbitmq.com
  2. http://ask.github.com/celery/index.html
  3. http://groups.google.com/group/celery-users

Installation Steps

I set up Server first .

  • Install Celery
    • To do this, I used easy_install for Celery as follows.
$ sudo easy_install celery

  • Next install RabbitMQ - Originally I simply used Synaptic Package Manager in Ubuntu, searched for Rabbit and installed it and its dependencies, but I noticed this is version 1.6.0 and the latest here is 1.7.2 at time of writing, so I downloaded and installed the latest package instead.  I doubt it will make much difference which you use.
     
  • Test the installation - using the following steps below which I largely copied from here 
Configure the security / vhost etc.  Note this is an important step you'll almost certainly come back to later if you stuff up :-)

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_permissions -p myvhost myuser "" ".*" ".*"

At this point the server should actually be running (this confused me first time, I think the Rabbit Controller starts the server to add the hosts etc.).  You can check if there is a server running as follows:

$ sudo rabbitmqctl status

To start and stop the service, use the following

$ sudo rabbitmq-server

or to run it in the background (recommended)

$ sudo rabbitmq-server -detached

Finally, to stop the server

$ sudo rabbitmqctl stop

Leave the server stopped for now and the basic Server install is complete.

I repeated these steps on Laptop without any problems at all.  NB you could skip configuring security etc. on Laptop if you like as it will be reset in the next step, I think it's worth it anyway just as practice.

NB - If you see a dump like the following when starting the server then in my case this was because the server was already started - use the rabbitmqctl status to check.:

{error_logger,{{2010,2,23},{11,26,37}},"Protocol: ~p: register error: ~p~n",["inet_tcp",{{badmatch,{error,duplicate_name}},[{inet_tcp_dist,listen,1},{net_kernel,start_protos,4},{net_kernel,start_protos,3},{net_kernel,init_node,2},{net_kernel,init,1},{gen_server,init_it,6},{proc_lib,init_p_do_apply,3}]}]}etc. etc. etc.

Elapsed Time: Configure two PCs with Celery and Rabbit and document it? ~1 hr. For you? 30 minutes.

Cluster the RabbitMQ Servers

Knowing I want this to work across at least two servers from day one, I decided to next cluster the RabbitMQ servers.

This is remarkably straight forward if you follow the step listed in the RabbitMQ Clustering guide and DON'T miss the step I did on configuring the Erlang cookie.  I had a couple of minor issues that I needed to read up on, my Linux knowledge being very sketchy which slowed me up.  Here are the steps I ended up following (you could use the guide linked, but I've just added in a couple of things relevant to what we are doing here).

  • Firstly, configure the erlang cookie so that the erlang installs on the two machines can share processes.  This required changing permissions first.  Make sure the RabbitMQ server is STOPPED.  When you are editing the cookie, simply replace whatever is in there with your own string.  It needs to be the same string on both Server and Laptop.  Length doesn't matter.  The default security on the cookie file is very tight, I needed to change permissions to be able to edit it, then I changed them back.  Replace gedit with your editor of choice (e.g. vi).
$ sudo chmod 777 /var/lib/rabbitmq/.erlang.cookie
$ sudo gedit /var/lib/rabbitmq/.erlang.cookie
$ sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
  • Make sure that laptop and server are in each others local hosts file (by this I mean that your machine name for your Server equivalent needs to be in the local host for Laptop and vice versa).  If you can ping Server from Laptop and vice versa, you should be fine.
  • On Server, start RabbitMQ in detached mode, check status and make sure it's running.
  • On Laptop, make sure the RabbitMQ Server is stopped, then make sure you've set the cookie as above. Start the RabbitMQ server in detached mode and now complete the following steps.
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl cluster rabbit@server
$ sudo rabbitmqctl start_app

 Note - if you have trouble with the reset (because like me you tried to actually cluster the machine BEFORE the cookie was set in Erlang) you can try sudo rabbitmqctl force_reset which should sort it out.

  • You now have a clustered RabbitMQ setup.  You can verify this by typing sudo rabbitmqctl status on either server and you'll see it list something like the following:
[{running_applications,[{rabbit,"RabbitMQ","1.7.2"},
                        {mnesia,"MNESIA  CXC 138 12","4.4.10"},
                        {os_mon,"CPO  CXC 138 46","2.2.2"},
                        {sasl,"SASL  CXC 138 11","2.1.6"},
                        {stdlib,"ERTS  CXC 138 10","1.16.2"},
                        {kernel,"ERTS  CXC 138 10","2.13.2"}]},
 {nodes,['rabbit@laptop','rabbit@server']},
 {running_nodes,['rabbit@laptop','rabbit@server']}]

This is neat, but if I read the documentation correctly what we have here is a RAM / RAM cluster.  If one of the servers goes down, we will be fine because the message state is replicated across clusters, but if the whole lot went out (because the data centre lost power, or more likely in my situation that I just turned both PCs off over night) we might really want a persistent DISK node.

To convert Server to being a disk node, simply execute the following on Server (while the cluster is running).

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl cluster rabbit@laptop rabbit@server

Or (FYI) to turn it back into a RAM node.  Note it doesn't matter for our purposes in this doco how they are configured, read up and decide what you need.

$ sudo rabbitmqctl cluster rabbit@laptop

Finally, start it up again

$ sudo rabbitmqctl start_app

To remove a server from a cluster at any time, simply do this (this is not a required step, it's just "FYI").

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl start_app

NB - You'll need to use force_reset for the LAST node in the cluster to be removed (if you're separating them all out again).  REMEMBER if you do a full cluster reset (like I did in testing this), you'll need to redo the security section from the first section again :-)  This is because you've reset both nodes so they no longer hold the security information.  If you ever get into problems with the configuration, I've resolved them consistently by de-clustering, forcing a node reset on both nodes and then clustering and redoing the security.

Elapsed Time: Research, reading, implementing and trouble shooting cluster ~2 hr.  Following the above steps, prob. 30 minutes.

Configure Celery with Django

OK, now we are ready to get Celery setup with Django. Create a new django project.  Mines called "clifton" (we use local train stations as our milestone names) and then within that I have an app called fetcher.  All of these steps need to be done on both Server and Laptop (I'm using SVN so I simply make the change on Server, commit and then update from Laptop).

  • In the INSTALLED_APPS section of settings.py add celery as follows.
INSTALLED_APPS = (
    'django.contrib.sessions',
    'django.contrib.sites',
    'celery',
    'clifton.fetcher',
)

  • Sync the DB as celery adds a couple of task tracking tables.
$ python manage.py syncdb
  • Add the following settings into settings.py as well.  These are just to get us going, we'll come back to this and improve them soon once we've got this section working.
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "mypassword"
BROKER_VHOST = "myvhost"
  • Finally, in your application directory (e.g. fetcher in my example) create a tasks.py module with the following code.  This is literally just for a test to show it's working.  Note it MUST be called tasks.py.  It uses the decorator to wrap a class definition around a simple add function that adds two numbers and returns a result.  Note that the decorator causes the class to have the name of the function (e.g. we effectively end up with a class called add in the tasks.py).
from celery.decorators import task

@task()
def add(x, y):
    return x + y


Having completed these steps on Server and Laptop, we now need to actually test and run our AMQP workers.  Just on Server for now, do the following steps.

  • Start a terminal window from the clifton directory (or wherever you put your Django app) start a celery daemon using Django.  Again, there are more options here, but this is just a starting point to show it all working.
$ python manage.py celeryd

  • Now open a second terminal window and execute the following from the clifton directory.   We use the Django interpretive shell to be sure all the settings are loaded nicely for us.
$ python manage.py shell
>> from clifton.fetcher import tasks
>> result = tasks.add.delay(4,4)
>> result.ready()
True
>> result.result
8

So what just happened? Well, Celery wrapped up the add function (which simply adds x and y and returns them) in a class through the decorator.  We then, through the Django interpreter shell, imported the class, called the delay method, passing the parameters 4 and 4.  Celery then handled all the work of pushing that out onto the RabbitMQ server, then the celeryd worker actually got the values, executed them and provided the results.  The result.ready() command told us that our results for this particular instance had been processed and then result.result returned what that result actually was.  If result.ready() is False, you may find that the AMQP server is running but you don't have the worker process running.

We've just passed our first message. There is a LOT going on under the hood here, but this example is fairly uninspiring.  Shut down the celeryd terminal (send it a TERM signal, from command line do ps aux | grep celeryd then find the first process ID and do kill <pid> (e.g. kill 1234)) and see what happens if you execute the code again (perhaps with different parameters e.g. tasks.add.delay(2, 3) to see the difference).  Yup, result.ready() returns False.  It will keep returning False until you start the worker process up again.  The message has been queued and at least for this configuration, will persist until it gets processed by a worker.  Start the celeryd process again ($ python manage.py celeryd) and now try result.ready() in the shell - the message was processed and a result is able to be returned.

If you're still unsure about how cool this is, then you probably don't need queuing :-) But for the final trick in this section until we get this thing rocking and rolling properly, lets demonstrate it really is a cluster being used to process this.

Of course by default, the cluster is up and running already (presuming you haven't stopped the Laptop), but to be sure, just check with rabbitmqctl status.  Make sure you've also run through these settings in this current section (Configure Celery with Django) on both Laptop and Server.  Now - on Server, make sure that any running celeryd terminal is shutdown.  Start the celeryd terminal on Laptop.  From the Python / Django interpretive shell on Server, repeat the same steps.

Assuming you got a result successfully returned, you've just used the message queue cluster to have a worker on Laptop pick up and process the message from Server.  The subtlety here is that the underlying RabbitMQ cluster handled passing the message between the two machines - both Server and Laptop are talking to RabbitMQ locally, it was the cluster that handled making the message available in both places.

Elapsed Time: Research, reading, implementing, testing, trouble shooting and documenting ~4 hr.  Following the above steps, prob. 1hr minutes.

Re-Factor the Task and Demo to make life easier!

Note that this previous example is of course very basic and can be significantly optimised - in particular there is no connection pooling, so every new call to add.delay() is opening its own connection to the broker, hence the delay in posting the message.  You can speed it up by passing a previously opened connection - see here for more information.  We'll implement this below.

Additionally, the Task Decorator, while useful for basic tasks, masks a lot of what's really happening - I prefer a more verbose approach when I'm learning.  So the new fetcher/tasks.py - note we've changed the name of the class to MyTest. Note that it MUST have a "run" method which is what will be called by apply_sync.

from celery.task import Task

class MyTest(Task):

    def run(self, x, y):
    logger.info('Received %s and %s' % (x, y))
        return int(x) + int(y)


We'll also make our life a bit easier by adding a bin directory into the clifton project (this is where I like to store all my cron jobs etc.) and create a simple python file to call our task.  This will let us just execute the file to generate the tasks - the sort of thing you'll need to do anyway if you want to call the tasks from a view etc.  I'll call it demo.py (for clarity, in my case this is in ~/clifton/bin/demo.py).  Copy the following code in (make any local adjustments needed).  Note the change to use apply_async instead of delay().  It is a lower level call to do the same thing as delay(), but allows us to add arguments etc.  I've also implemented the connection pooling.

#! /usr/bin/env python
from __future__ import with_statement

import os, sys
#os.environ['PYTHONPATH'] = "/home/tim/projects" # Uncomment and point this to your root django project
                                                 # directory if PYTHONPATH not set properly already.
os.environ['DJANGO_SETTINGS_MODULE']='clifton.settings'

from clifton.fetcher.tasks import MyTest
from celery.messaging import establish_connection

if __name__ == "__main__":

    numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]

    results = []
    with establish_connection() as connection:
        for args in numbers:
            res = MyTest.apply_async(args=args, connection=connection)
            results.append(res)

    print([res.get() for res in results])


If you run this from the bin directory, you should see the following result:

$ python demo.py
[4, 8, 16, 32]

We now have a better structure to take our experiments forwards and further optimise.

Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~2 hr.  Following the above steps, prob. 30 minutes.

Directing tasks to different servers

From here on in, this will become a little less generic and begin to deal with the problems that we are having and trying to resolve.  I'll explain why we've made our decisions and you can make your own choices.  While you can make each of these settings in settings.py on the two different machines, I suggest you create a local_settings.py and add them in there (especially if you're using SVN to sync changes - ie. running the same Django project on both servers).  Where I say settings.py below I've actually created these changes in a local_settings.py.  Note that most of these are default settings and can generally be over-ridden at different levels.

  • Run tasks on different machines.  This actual design of these queues, exchanges and bindings is a reasonably complicated topic that gets very design specific and frankly confusing!  It's also (somewhat) abstracted by Celery which implements only Direct and Topic exchanges for example.  It's worth reading this post on Rabbits and Warrens which gives some good background on the various options provided by AMQP.  If all you want to do is post a message and pick it up with a worker, then celery handles that for you, but in our case, we'd like to make some processing choices up front and be able to use different machines to process different messages.   There is a good guide to this in the FAQ for Celery.  Add the following to Server in settings.py
CELERY_DEFAULT_QUEUE = "server"
CELERY_QUEUES = {
"server": {
"binding_key": "server_task",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "server_task"


We've just told celeryd that when it starts it should bind to a queue called "server" and listen for messages routed to server_task.  Do the same on Laptop, but specify laptop instead.

CELERY_DEFAULT_QUEUE = "laptop"
CELERY_QUEUES = {
"laptop": {
"binding_key": "laptop_task",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "laptop_task"

There are several different ways of implementing this (telling tasks where to execute).  For example, in this configuration, if we do nothing and simply run the demo.py we created earlier, tasks started on Server will execute on Server and vice versa, because the defaults will be applied (which are different on each machine).  Most likely we want to make some decision at execution time about where we want to route this task.  In this case, simplify modify line 19 of the demo.py to add the routing_key parameter as follows:

res = MyTest.apply_async(args=args, connection=connection, routing_key="laptop_task")

Changing it to routing_key = 'server_task' will force the task to execute on Server (no matter where it originated, although strictly speaking it forces it to execute on the worker bound to the server queue, listening for server_task - we just happen to configure this queue on Server) and vice versa.  Test this out and make sure its all working, you might like to add some logging into the tasks.py so you can check and see what is executing where (next section shows how if you need help with this).  NB - It looks like the task files are cached in the celeryd worker, so if you modify the task, it looks like a good idea to restart the worker, at least for the moment when testing. 

As a final note, lets say you want the Server to ALSO be able to pick up Laptop tasks (to use its spare capacity - or just as a more "realistic" example).  Simply make the following change to Server settings.py CELERY_QUEUES and now Server is listening for Laptop tasks too.  Because both Laptop and Server are bound to the same queue (which happens to be called laptop in our example), Rabbit automatically round robins messages between the two.  Change the CELERY_QUEUES setting as follows to take advantage of this on Server only.

CELERY_QUEUES = {
"server": {
"binding_key": "server_task",
"laptop": {
"binding_key": "laptop_task",
},

If you extend the demo.py so it passes a lot more numbers, you should be able to use demo.py to send messages to server_task and laptop_task and see the difference by checking the logs.  Both Server and Laptop process tasks for laptop_task, while only Server processes tasks for server_task.

numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8)
           (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14),
           (15, 15), (16, 16), (17, 17), (18, 18)]

Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~4.5 hr.  Following the above steps, prob. 2 hrs max.

Get a (heart) beat

We now have a (basic) distributed architecture with the ability to route jobs to a specific server and we also know how to distribute across two servers (or more).  The next trick is to create an automated task that runs on a regular basis.  Now we could use a CRON job or something similar, but it would be nice if there was a way of building this into Django and Celery so we can route the messages straight on to a queue. It turns out that there is using a periodic_task which does more or less what it says on the box.  It runs periodically.  I modified our earlier tasks.py as follows below.  Now as well as the basic add task already defined, we now have a PeriodicTask which executes every 30 seconds.  It doesn't do much - I just simply had it call add again in this example.

from celery.task import PeriodicTask, Task
from celery.messaging import establish_connection

import logging
logger = logging.getLogger('fetcher.tasks')

class MyTest(Task):

 
    def run(self, x, y):
    logger.info('Received %s and %s' % (x, y))
        return int(x) + int(y)

class MyPeriodicTask(PeriodicTask):
    run_every = timedelta(seconds=30)

    def run(self, **kwargs):
        numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18)]
        with establish_connection() as connection:
            for n in numbers:
                MyTest.apply_async(args=[n[0], n[1]], connection=connection, routing_key="laptop_task")
       
        logger.info('Ran periodic task')


You might need to remove the logging options unless you have a proper logger setup in your settings.py (FYI, you can add the following into settings.py and this should all work nicely for you - just change paths as appropriate and also make sure you have a log directory).

LOG_FILENAME= '/home/tim/clifton/log/debug.log'
import logging
logging.basicConfig(
    filename=LOG_FILENAME,
    format='[%(levelname)-5s] %(asctime)-8s %(name)-10s %(message)s',
    #datefmt='%a, %d %b %Y %H:%M:%S',
    datefmt='%H:%M:%S',
    level=logging.DEBUG)

But how does it execute?  Well in this instance we don't actually need a demo.py, instead we can use the beat feature of the celeryd to execute the periodic task for us.  To do this, simply start your celeryd with the -B option.  e.g.

$ python manage.py celeryd -B

Alternatively you can run a dedicated celerybeat server.

$ python manage.py celerybeat

Just be aware that if you run a dedicated celerybeat server, you'll also need to start a worker (celeryd) yourself, otherwise you'll have your tasks sent to the queue, but not processed.  I have a personal preference for running the dedicated celerybeat server as it makes it easier to isolate and shutdown just the beat server process from the workers. (ps aux | grep celerybeat)

Elapsed Time: Implementing, limited trouble shooting and documenting ~1 hr.  Following the above steps, prob. 30 minutes.

Set Static Execution Limits

I suspect for many people this is now approaching the point where you have enough of a grip to be able to use this in anger in a lot of different situations.  We have one additional problem however that we need to resolve.  In some instances we need to limit the amount of messages that a queue processes.  This is because while we might have the physical capacity to process 500,000 calls an hour, we could very well have other service API limits imposed on us by the services we are calling. 

In fact this is exactly the situation when we are talking to the Twitter API.  Twitter allocates us a limit and we can't exceed this.  To make it more complicated, this limit can actually differ by server.  We need to be able to limit execution, by server, to not exceed our API limit.

Actually there is a simple enough partial solution for this.  Celery defines a rate_limit which controls the amount a task can execute in a given time period.  To use it, pass it in to the @task() decorator, or specify it at the top of the class definition. e.g.

@task(rate_limit="100/m")
def add(x, y):
    logger.info('Adding %s and %s together' % (x, y))
    return x + y

class MyTest(Task):
    rate_limit="100/m"

    def run(self, x, y):
    logger.info('Received %s and %s' % (x, y))
        return int(x) + int(y)

class MyPeriodicTask(PeriodicTask):
    run_every = timedelta(seconds=30)
    rate_limit="1000/h"

    def run(self, **kwargs):
        r = add(1, 2)
        logger.info("Running periodic task! Result of add was %s" % r)


The rate_limit is a combination of "how many times" and a per hour (h), second(s) or minutes(m) value. e.g. "1000/h" is executed no more than 1000 times in an hour.

Because each worker executes on a different server, and therefore gets different settings from the settings.py (or the local_settings.py) file, we can use this to change execution by server if we use a full class definition (and not the decorator) by adding ou

Loading mentions Retweet
Filed under  //   ampq   celery   rabbitmq   technical  
Posted March 8, 2010
// 0 Comments

Part 1: Build a processing queue with multi-threading and spread over multiple servers in less than a day using RabbitMQ and Celery.

 

This is part 1 - part 2 is over here.

As we move through the development cycle we now have many of the essential processing modules we need for Tribalytic, but we also have a few challenges we need to deal with as well:

  1. We need to collect data faster than we can process it on a single processor.  Requirement - we need to be able to collect the data and then spread it out for processing over multiple processors / servers.
  2. Some servers have a quota on how much they can process per hour for certain things (related to API limits set by Twitter etc.) Requirement - servers need to know their available limits and not take on more than they can process.
  3. We need to be able to schedule some tasks on a regular basis.

There are lots of different ways that this could be done, but after some preliminary research, I'm going to set out to do it using two key pieces of technology:

  1. RabbitMQ - An AMQP (Advanced Message Queuing Protocol) server.  Robust message queues help a LOT with point one in particular.
  2. Celery - A pythonic / Django friendly task scheduler and queuing interface to Rabbit / AMQP. Celery provides the magic for points two and three.

The goal of this document is for me to both document what I've learnt so I can replicate in our production environment and lay out the "easy" steps after having digested the various documentation for you to able to implement something that solves some common problems quickly.

There are all sort of reasons that I've preselected these technologies which aren't the point of this post - this post is documenting the challenge of having selected this approach, how easy is it to implement?  Firstly a few notes about my setup:

  1. I have two machines I'll be configuring this on, both running Ubuntu 9.10 Karmic Koala release.  I'll call them Server and Laptop.  For the record, Server is a PC with 4 cores and 6Gb of RAM.  Laptop is an MSI laptop with a single core (1.3Ghz) and 2Gb RAM.  When you see Server and Laptop, replace them with your own machine names.
  2. Python and Django are already pre-configured and working.

Finally, there are a lot of great resources out there, but the few I relied on the most (and where some of the content here is adapted from) are:

  1. http://www.rabbitmq.com
  2. http://ask.github.com/celery/index.html
  3. http://groups.google.com/group/celery-users


Installation Steps

I set up Server first .

  • Install Celery
  1.  
    • To do this, I used easy_install for Celery as follows.
$ sudo easy_install celery

  • Next install RabbitMQ - Originally I simply used Synaptic Package Manager in Ubuntu, searched for Rabbit and installed it and its dependencies, but I noticed this is version 1.6.0 and the latest here is 1.7.2 at time of writing, so I downloaded and installed the latest package instead.  I doubt it will make much difference which you use.
  • Test the installation - using the following steps below which I largely copied from here 
Configure the security / vhost etc.  Note this is an important step you'll almost certainly come back to later if you stuff up :-)

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_permissions -p myvhost myuser "" ".*" ".*"

At this point the server should actually be running (this confused me first time, I think the Rabbit Controller starts the server to add the hosts etc.).  You can check if there is a server running as follows:

$ sudo rabbitmqctl status

To start and stop the service, use the following

$ sudo rabbitmq-server

or to run it in the background (recommended)

$ sudo rabbitmq-server -detached

Finally, to stop the server

$ sudo rabbitmqctl stop


Leave the server stopped for now and the basic Server install is complete.

I repeated these steps on Laptop without any problems at all.  NB you could skip configuring security etc. on Laptop if you like as it will be reset in the next step, I think it's worth it anyway just as practice.

NB - If you see a dump like the following when starting the server then in my case this was because the server was already started - use the rabbitmqctl status to check.:

{error_logger,{{2010,2,23},{11,26,37}},"Protocol: ~p: register error: ~p~n",["inet_tcp",{{badmatch,{error,duplicate_name}},[{inet_tcp_dist,listen,1},{net_kernel,start_protos,4},{net_kernel,start_protos,3},{net_kernel,init_node,2},{net_kernel,init,1},{gen_server,init_it,6},{proc_lib,init_p_do_apply,3}]}]}etc. etc. etc.


Elapsed Time: Configure two PCs with Celery and Rabbit and document it? ~1 hr. For you? 30 minutes.

Cluster the RabbitMQ Servers

Knowing I want this to work across at least two servers from day one, I decided to next cluster the RabbitMQ servers.

This is remarkably straight forward if you follow the step listed in the RabbitMQ Clustering guide and DON'T miss the step I did on configuring the Erlang cookie.  I had a couple of minor issues that I needed to read up on, my Linux knowledge being very sketchy which slowed me up.  Here are the steps I ended up following (you could use the guide linked, but I've just added in a couple of things relevant to what we are doing here).

  • Firstly, configure the erlang cookie so that the erlang installs on the two machines can share processes.  This required changing permissions first.  Make sure the RabbitMQ server is STOPPED.  When you are editing the cookie, simply replace whatever is in there with your own string.  It needs to be the same string on both Server and Laptop.  Length doesn't matter.  The default security on the cookie file is very tight, I needed to change permissions to be able to edit it, then I changed them back.  Replace gedit with your editor of choice (e.g. vi).
$ sudo chmod 777 /var/lib/rabbitmq/.erlang.cookie
$ sudo gedit /var/lib/rabbitmq/.erlang.cookie
$ sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
  • Make sure that laptop and server are in each others local hosts file (by this I mean that your machine name for your Server equivalent needs to be in the local host for Laptop and vice versa).  If you can ping Server from Laptop and vice versa, you should be fine.
  • On Server, start RabbitMQ in detached mode, check status and make sure it's running.
  • On Laptop, make sure the RabbitMQ Server is stopped, then make sure you've set the cookie as above. Start the RabbitMQ server in detached mode and now complete the following steps.
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl cluster rabbit@server
$ sudo rabbitmqctl start_app

 Note - if you have trouble with the reset (because like me you tried to actually cluster the machine BEFORE the cookie was set in Erlang) you can try sudo rabbitmqctl force_reset which should sort it out.

  • You now have a clustered RabbitMQ setup.  You can verify this by typing sudo rabbitmqctl status on either server and you'll see it list something like the following:
[{running_applications,[{rabbit,"RabbitMQ","1.7.2"},
                        {mnesia,"MNESIA  CXC 138 12","4.4.10"},
                        {os_mon,"CPO  CXC 138 46","2.2.2"},
                        {sasl,"SASL  CXC 138 11","2.1.6"},
                        {stdlib,"ERTS  CXC 138 10","1.16.2"},
                        {kernel,"ERTS  CXC 138 10","2.13.2"}]},
 {nodes,['rabbit@laptop','rabbit@server']},
 {running_nodes,['rabbit@laptop','rabbit@server']}]

This is neat, but if I read the documentation correctly what we have here is a RAM / RAM cluster.  If one of the servers goes down, we will be fine because the message state is replicated across clusters, but if the whole lot went out (because the data centre lost power, or more likely in my situation that I just turned both PCs off over night) we might really want a persistent DISK node.

To convert Server to being a disk node, simply execute the following on Server (while the cluster is running).

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl cluster rabbit@laptop rabbit@server

Or (FYI) to turn it back into a RAM node.  Note it doesn't matter for our purposes in this doco how they are configured, read up and decide what you need.

$ sudo rabbitmqctl cluster rabbit@laptop

Finally, start it up again

$ sudo rabbitmqctl start_app

To remove a server from a cluster at any time, simply do this (this is not a required step, it's just "FYI").

$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl reset
$ sudo rabbitmqctl start_app

NB - You'll need to use force_reset for the LAST node in the cluster to be removed (if you're separating them all out again).  REMEMBER if you do a full cluster reset (like I did in testing this), you'll need to redo the security section from the first section again :-)  This is because you've reset both nodes so they no longer hold the security information.  If you ever get into problems with the configuration, I've resolved them consistently by de-clustering, forcing a node reset on both nodes and then clustering and redoing the security.

Elapsed Time: Research, reading, implementing and trouble shooting cluster ~2 hr.  Following the above steps, prob. 30 minutes.

Configure Celery with Django

OK, now we are ready to get Celery setup with Django. Create a new django project.  Mines called "clifton" (we use local train stations as our milestone names) and then within that I have an app called fetcher.  All of these steps need to be done on both Server and Laptop (I'm using SVN so I simply make the change on Server, commit and then update from Laptop).

  • In the INSTALLED_APPS section of settings.py add celery as follows.
INSTALLED_APPS = (
    'django.contrib.sessions',
    'django.contrib.sites',
    'celery',
    'clifton.fetcher',
)

  • Sync the DB as celery adds a couple of task tracking tables.
$ python manage.py syncdb
  • Add the following settings into settings.py as well.  These are just to get us going, we'll come back to this and improve them soon once we've got this section working.
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "mypassword"
BROKER_VHOST = "myvhost"
  • Finally, in your application directory (e.g. fetcher in my example) create a tasks.py module with the following code.  This is literally just for a test to show it's working.  Note it MUST be called tasks.py.  It uses the decorator to wrap a class definition around a simple add function that adds two numbers and returns a result.  Note that the decorator causes the class to have the name of the function (e.g. we effectively end up with a class called add in the tasks.py).
from celery.decorators import task

@task()
def add(x, y):
    return x + y


Having completed these steps on Server and Laptop, we now need to actually test and run our AMQP workers.  Just on Server for now, do the following steps.

  • Start a terminal window from the clifton directory (or wherever you put your Django app) start a celery daemon using Django.  Again, there are more options here, but this is just a starting point to show it all working.
$ python manage.py celeryd

  • Now open a second terminal window and execute the following from the clifton directory.   We use the Django interpretive shell to be sure all the settings are loaded nicely for us.
$ python manage.py shell
>> from clifton.fetcher import tasks
>> result = tasks.add.delay(4,4)
>> result.ready()
True
>> result.result
8

So what just happened? Well, Celery wrapped up the add function (which simply adds x and y and returns them) in a class through the decorator.  We then, through the Django interpreter shell, imported the class, called the delay method, passing the parameters 4 and 4.  Celery then handled all the work of pushing that out onto the RabbitMQ server, then the celeryd worker actually got the values, executed them and provided the results.  The result.ready() command told us that our results for this particular instance had been processed and then result.result returned what that result actually was.  If result.ready() is False, you may find that the AMQP server is running but you don't have the worker process running.

We've just passed our first message. There is a LOT going on under the hood here, but this example is fairly uninspiring.  Shut down the celeryd terminal (send it a TERM signal, from command line do ps aux | grep celeryd then find the first process ID and do kill <pid> (e.g. kill 1234)) and see what happens if you execute the code again (perhaps with different parameters e.g. tasks.add.delay(2, 3) to see the difference).  Yup, result.ready() returns False.  It will keep returning False until you start the worker process up again.  The message has been queued and at least for this configuration, will persist until it gets processed by a worker.  Start the celeryd process again ($ python manage.py celeryd) and now try result.ready() in the shell - the message was processed and a result is able to be returned.

If you're still unsure about how cool this is, then you probably don't need queuing :-) But for the final trick in this section until we get this thing rocking and rolling properly, lets demonstrate it really is a cluster being used to process this.

Of course by default, the cluster is up and running already (presuming you haven't stopped the Laptop), but to be sure, just check with rabbitmqctl status.  Make sure you've also run through these settings in this current section (Configure Celery with Django) on both Laptop and Server.  Now - on Server, make sure that any running celeryd terminal is shutdown.  Start the celeryd terminal on Laptop.  From the Python / Django interpretive shell on Server, repeat the same steps.

Assuming you got a result successfully returned, you've just used the message queue cluster to have a worker on Laptop pick up and process the message from Server.  The subtlety here is that the underlying RabbitMQ cluster handled passing the message between the two machines - both Server and Laptop are talking to RabbitMQ locally, it was the cluster that handled making the message available in both places.

Elapsed Time: Research, reading, implementing, testing, trouble shooting and documenting ~4 hr.  Following the above steps, prob. 1hr minutes.

Re-Factor the Task and Demo to make life easier!

Note that this previous example is of course very basic and can be significantly optimised - in particular there is no connection pooling, so every new call to add.delay() is opening its own connection to the broker, hence the delay in posting the message.  You can speed it up by passing a previously opened connection - see here for more information.  We'll implement this below.

Additionally, the Task Decorator, while useful for basic tasks, masks a lot of what's really happening - I prefer a more verbose approach when I'm learning.  So the new fetcher/tasks.py - note we've changed the name of the class to MyTest. Note that it MUST have a "run" method which is what will be called by apply_sync.

from celery.task import Task

class MyTest(Task):

    def run(self, x, y):
    logger.info('Received %s and %s' % (x, y))
        return int(x) + int(y)


We'll also make our life a bit easier by adding a bin directory into the clifton project (this is where I like to store all my cron jobs etc.) and create a simple python file to call our task.  This will let us just execute the file to generate the tasks - the sort of thing you'll need to do anyway if you want to call the tasks from a view etc.  I'll call it demo.py (for clarity, in my case this is in ~/clifton/bin/demo.py).  Copy the following code in (make any local adjustments needed).  Note the change to use apply_async instead of delay().  It is a lower level call to do the same thing as delay(), but allows us to add arguments etc.  I've also implemented the connection pooling.

#! /usr/bin/env python
from __future__ import with_statement

import os, sys
#os.environ['PYTHONPATH'] = "/home/tim/projects" # Uncomment and point this to your root django project
                                                 # directory if PYTHONPATH not set properly already.
os.environ['DJANGO_SETTINGS_MODULE']='clifton.settings'

from clifton.fetcher.tasks import MyTest
from celery.messaging import establish_connection

if __name__ == "__main__":

    numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]

    results = []
    with establish_connection() as connection:
        for args in numbers:
            res = MyTest.apply_async(args=args, connection=connection)
            results.append(res)

    print([res.get() for res in results])


If you run this from the bin directory, you should see the following result:

$ python demo.py
[4, 8, 16, 32]

We now have a better structure to take our experiments forwards and further optimise.

Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~2 hr.  Following the above steps, prob. 30 minutes.

Directing tasks to different servers

From here on in, this will become a little less generic and begin to deal with the problems that we are having and trying to resolve.  I'll explain why we've made our decisions and you can make your own choices.  While you can make each of these settings in settings.py on the two different machines, I suggest you create a local_settings.py and add them in there (especially if you're using SVN to sync changes - ie. running the same Django project on both servers).  Where I say settings.py below I've actually created these changes in a local_settings.py.  Note that most of these are default settings and can generally be over-ridden at different levels.

  • Run tasks on different machines.  This actual design of these queues, exchanges and bindings is a reasonably complicated topic that gets very design specific and frankly confusing!  It's also (somewhat) abstracted by Celery which implements only Direct and Topic exchanges for example.  It's worth reading this post on Rabbits and Warrens which gives some good background on the various options provided by AMQP.  If all you want to do is post a message and pick it up with a worker, then celery handles that for you, but in our case, we'd like to make some processing choices up front and be able to use different machines to process different messages.   There is a good guide to this in the FAQ for Celery.  Add the following to Server in settings.py
CELERY_DEFAULT_QUEUE = "server"
CELERY_QUEUES = {
"server": {
"binding_key": "server_task",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "server_task"


We've just told celeryd that when it starts it should bind to a queue called "server" and listen for messages routed to server_task.  Do the same on Laptop, but specify laptop instead.

CELERY_DEFAULT_QUEUE = "laptop"
CELERY_QUEUES = {
"laptop": {
"binding_key": "laptop_task",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "laptop_task"

There are several different ways of implementing this (telling tasks where to execute).  For example, in this configuration, if we do nothing and simply run the demo.py we created earlier, tasks started on Server will execute on Server and vice versa, because the defaults will be applied (which are different on each machine).  Most likely we want to make some decision at execution time about where we want to route this task.  In this case, simplify modify line 19 of the demo.py to add the routing_key parameter as follows:

res = MyTest.apply_async(args=args, connection=connection, routing_key="laptop_task")

Changing it to routing_key = 'server_task' will force the task to execute on Server (no matter where it originated, although strictly speaking it forces it to execute on the worker bound to the server queue, listening for server_task - we just happen to configure this queue on Server) and vice versa.  Test this out and make sure its all working, you might like to add some logging into the tasks.py so you can check and see what is executing where (next section shows how if you need help with this).  NB - It looks like the task files are cached in the celeryd worker, so if you modify the task, it looks like a good idea to restart the worker, at least for the moment when testing. 

As a final note, lets say you want the Server to ALSO be able to pick up Laptop tasks (to use its spare capacity - or just as a more "realistic" example).  Simply make the following change to Server settings.py CELERY_QUEUES and now Server is listening for Laptop tasks too.  Because both Laptop and Server are bound to the same queue (which happens to be called laptop in our example), Rabbit automatically round robins messages between the two.  Change the CELERY_QUEUES setting as follows to take advantage of this on Server only.

CELERY_QUEUES = {
"server": {
"binding_key": "server_task",
"laptop": {
"binding_key": "laptop_task",
},

If you extend the demo.py so it passes a lot more numbers, you should be able to use demo.py to send messages to server_task and laptop_task and see the difference by checking the logs.  Both Server and Laptop process tasks for laptop_task, while only Server processes tasks for server_task.

numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8)
           (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14),
           (15, 15), (16, 16), (17, 17), (18, 18)]

Elapsed Time: Research, reading, implementing, trouble shooting and documenting ~4.5 hr.  Following the above steps, prob. 2 hrs max.

Get a (heart) beat

We now have a (basic) distributed architecture with the ability to route jobs to a specific server and we also know how to distribute across two servers (or more).  The next trick is to create an automated task that runs on a regular basis.  Now we could use a CRON job or something similar, but it would be nice if there was a way of building this into Django and Celery so we can route the messages straight on to a queue. It turns out that there is using a periodic_task which does more or less what it says on the box.  It runs periodically.  I modified our earlier tasks.py as follows below.  Now as well as the basic add task already defined, we now have a PeriodicTask which executes every 30 seconds.  It doesn't do much - I just simply had it call add again in this example.

from celery.task import PeriodicTask, Task
from celery.messaging import establish_connection

import logging
logger = logging.getLogger('fetcher.tasks')

class MyTest(Task):

    def run(self, x, y):
    logger.info('Received %s and %s' % (x, y))
        return int(x) + int(y)

class MyPeriodicTask(PeriodicTask):
    run_every = timedelta(seconds=30)

    def run(self, **kwargs):
        numbers = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18)]
        with establish_connection() as connection:
            for n in numbers:
                MyTest.apply_async(args=[n[0], n[1]], connection=connection, routing_key="laptop_task")

                logger.info('Ran periodic task')


You might need to remove the logging options unless you have a proper logger setup in your settings.py (FYI, you can add the following into settings.py and this should all work nicely for you - just change paths as appropriate and also make sure you have a log directory).

LOG_FILENAME= '/home/tim/clifton/log/debug.log'
import logging
logging.basicConfig(
    filename=LOG_FILENAME,
    format='[%(levelname)-5s] %(asctime)-8s %(name)-10s %(message)s',
    #datefmt='%a, %d %b %Y %H:%M:%S',
    datefmt='%H:%M:%S',
    level=logging.DEBUG)

But how does it execute?  Well in this instance we don't actually need a demo.py, instead we can use the beat feature of the celeryd to execute the periodic task for us.  To do this, simply start your celeryd with the -B option.  e.g.

$ python manage.py celeryd -B

Alternatively you can run a dedicated celerybeat server.

$ python manage.py celerybeat

Just be aware that if you run a dedicated celerybeat server, you'll also need to start a worker (celeryd) yourself, otherwise you'll have your tasks sent to the queue, but not processed.  I have a personal preference for running the dedicated celerybeat server as it makes it easier to isolate and shutdown just the beat server process from the workers. (ps aux | grep celerybeat)

Elapsed Time: Implementing, limited trouble shooting and documenting ~1 hr.  Following the above steps, prob. 30 minutes.

Ready for more? Continue to part 2.

Loading mentions Retweet
Filed under  //   ampq   celery   rabbitmq   technical  
Posted March 8, 2010
// 0 Comments

Reflections on start-up life: Week 16

There is something about actually visualising output that feels like progress for me.  Never mind that we'd spent 4 weeks crunching the numbers and doing all the "heavy lifting" to get to the point we could start working on the icing, but that last 2 days of effort where we could see and "touch" data at the end of it felt like progress in a way that the proceeding time didn't.

Yes, we finally managed to get the first cut of our prototype done and it felt great! Now of course there is a lot more to do, further revision, further modification and lots of market conversations with different parties, but we are moving forwards, and more importantly it feels mentally to me like we are moving forwards.

The lesson here is that fast iterations are critical for well being.  Even though I occasionally advocate for the long road (some tasks you can't just bite off in a day), there is no doubt that mentally I feel better when we are on the fast iteration path.  It's also the challenge of living on the bleeding edge with technology; progress can tend to be very lumpy.  For every day you progress ten times faster than others could because of your technology choices, there are occasional weeks where the learning curve can overwhelm you for a while.

Highlights?

  • Playing with real data out of the engine
  • Finishing the message queue architecture
  • Working with the prototype screens and thinking about how people use the data.
  • Some great meetings with a couple of potential clients who really "got" the value of what we are doing.
Lowlights?
  • Still no product, but one is in sight now.

Goal this week?

Yay! A new goal - iterate on the prototype and start to show it to people and seek feedback.

Loading mentions Retweet
Filed under  //   analysis   data   progress   startup  
Posted March 8, 2010
// 0 Comments

Reflections on start-up life: Week 15

This last week get very close to a repeat of week four.  The reasons were similar (scale of problem becoming clearer and a real challenge), but this time we decided to keep pushing through.  The opportunity delivered by solving the engine problems are too great to ignore. 

It should be said that it's not all doom and gloom - the current version of the new engine can already deliver very similar functionality to Twendly in a fraction of the time with significant scalability, so that's one significant engineering challenge down.

I guess the doubt creeps in because the longer we get from the decision point "we need a new engine" the more we really feel we are flying blind.  At the time we made the decision, there was a lot of clarity on why we needed it, but we also thought it would take a couple of weeks.  Four weeks in, the decision point is fading and we now start to feel like we are not sure why we are doing it.  On the other hand, taking time off to go back and reconnect to re-clarify, while probably a good thing, just pushes the engine out further.  In the end it comes down to trusting that the people Alex and I were four weeks ago when we made the decision haven't fundamentally changed - we could waste a lot of time second guessing ourselves or revisiting, but I think we need to accept that we were right and get the job done.  (Of course maybe the Tim and Alex of four weeks ago thought the Tim and Alex now would be a lot smarter than we are, but then I thought they would of left better documentation too.)

One downside of only having two people in your start-up is you've only got yourself to blame and it does seem to skirt close to insanity :-)

With that out the way, I do think some breakthroughs happened over the weekend - as Alex succinctly puts it "We've been confusing the scalability and the algorithm problems". 

Without trying to get too technical here, the problem is that the scalable solution we are building uses technology that is very fixed in its use cases - the way you design the data structures dictates how you'll use them.  We've been trying to build to much of this end use case into the data, instead of dealing with it in a less aggregated format that we can get at really fast, then using live processing power to aggregate further.  This might just let us be more flexible (and give us other scaling challenges down the path, but we are happy to have those if we get there!).

The other significant technical milestone has been a lot of work on my part in message queuing systems.  While it's again taken longer than it I would of liked, we've actually now got some really solid design and test cases for a very robust and scalable processing pipeline.  This lets us handle more complex solutions a lot better, but now requires some rewriting of core modules from complex sets of instructions to simpler tasks.

We keep pushing forwards and I have my fingers crossed that we'll have made some much more visual progress by next week.

Highlights?

  • Message queuing working properly.
  • An inkling of light at the end of the engine tunnel.
  • Catching up with a few mates on the phone.
Lowlights?
  • Continuing slow grind of progress.
  • A week at home with no external contact gets lonely.  Got to keep the social side up too.

Goal this week?

Same as last week - get that first prototype out the door ASAP.

Loading mentions Retweet
Filed under  //   amqp   progress   startup  
Posted February 28, 2010
// 0 Comments

Just trying post.ly for the heck of it. Love the UI - simple elegance.

Loading mentions Retweet
Posted February 23, 2010
// 0 Comments

Reflections on start-up life: Week 14

Well the first week with Alex back in China and us working remotely has passed remarkably smoothly.  With Skype at hand, we can easily keep in touch and communicating which is great.  We've also successfully split our work paths such that we can move on fairly independently from each other without any major delays.  No doubt this may change as we get further along, but for now it's been easy.

Progress remains slow.  I like to think that this is because finally we've set on the right challenge.  If what we are doing was too easy (i.e. we could knock it over inside a week or two), then we might be setting the bar too low.  We ARE progressing however and expect the new engine to be mostly complete early this week, which then leads to data transition tasks.

Something I spent a few days on which I enjoyed was working with the Yahoo Geo Coding APIs to locate take data from Twitter and locate people, an important part of our value proposition.  It proved to be an interesting challenge.  The stats are very promising - generally speaking real people (as opposed to bots or corporate accounts) provide good data about where they are.

Twendly continues to gather interest - it's great to see it still hanging in there without us really having done anything on it for several weeks now.   Even this morning we picked up a mention in a German marketing blog. I think they like us - my German is almost non-existent and I don't trust everything Google Translate says!

As has been hinted for a while, we formally "wound up" Hivemind and have now focused firmly on our new product, Tribalytic.  The best summary I've come up with is that with HiveMind there was clearly a business model, but we were clearly going to struggle being able to reach the market, with Twendly, we had a market, but there really wasn't a business model.  Finally with Tribalytic, we think we are on the right track - a tool for which there is both a market and a business model!

I had a great meeting with Michael Sampson from New Zealand about our change in direction and talking about what Enterprise can learn from the Internet, then a lunch with someone (big secret for now) who we've invited to be our first formal advisor (and still hoping they say yes).  So that was good progress in retrospect.

Having decided to switch directions I had to update our website and reflect our new direction.  I also needed to email our list of beta users and update them on what we are up to.  I chose to actually unsubscribe all these people from the list for a couple of reasons:

  1. We are clearly doing something different now than what they signed up for, so it seems most ethical to actually tell them that and stop tracking their email address.
  2. Early experiences showed that a "call to action" from a mailing list is a big challenge, so starting clean with people who at least WANT to be on the list is a useful thing.
A couple of people suggested I should of kept the mail addresses because they are an asset, but I also received a more positive response to that email than previous ones which makes me think I did the right thing.


Highlights?

  • Meeting new people.
  • Getting geocoding working.
  • Updating website and integrating the mail list management.
Lowlights?
  • Just the slow grind of progress, nothing bad, just feel like we are moving through molasses at the moment.

Goal this week?

Same as last week - get that first prototype out the door ASAP.

Loading mentions Retweet
Filed under  //   startup   twendly  
Posted February 21, 2010
// 0 Comments

Reflections on start-up life: Week 13

As many of you know, Alex is from China and, visas being what they are, he's now had to head back there.  On the plus side I think this now makes us a multi-national company, on the downside we obviously have some additional challenges to do with distance.  Of course this was never unexpected and has been planned for from the beginning.  The focus in the last week has been splitting our work into streams to make it a lot easier to work together.  In the medium term his plan is to move to Melbourne permanently but that will take time going through the application process (or us starting to make some significant money so BinaryPlex can sponsor him out).

We didn't make as much progress as we would of liked last week.  Momentum is a funny thing - this journey is a little bit like a Roller Coaster.  One day you're screaming down hill and think it will keep on like that for ever, then the next you suddenly start up hill again and progress feels like it grinds to a halt.  Focus is of course a big part of maintaining momentum and both Alex and I had our share of distractions last week (his more acceptable than mine!).

By Friday I was really tired and come Friday afternoon I literally just "downed tools" - the first time I've done that, but I wasn't feeling productive at all and just needed a break (I also went to the AC/DC concert the night before so that probably contributed to me feeling worn out.

Technically last week was really interesting as we explored NoSQL databases in a lot more detail and have finally settled on the underlying architecture components for moving forwards.  For those interested, we are using a hybrid collection of MySQL, REDIS and MongoDB for a range of different reasons.  This kind of architectural decision and research work is something I really enjoy.

In many ways there is not too much more to say about last week, we worked, we didn't progress as much as we'd of liked, we line up and go around again; this time from both China and Melbourne!

Highlights?

  • Playing with new (for me) technologies like MongoDB
  • Lunch with Anne and Stephen Bartlett-Bragg.  Always a pleasure and always a good sounding board.
  • Sitting by the Yarra for my final "face to face" meeting with Alex for a while on Thursday afternoon, reflecting on what we've achieved and what's to go.  Indulging in a bit of "what-if" and thinking out to June / July (something I generally try hard NOT to do).
Lowlights?
  • Low level gripes at myself for not getting as much done as I'd like.  Generally feeling tired all week.

Goal this week?

Same as last week - get that first prototype out the door ASAP.

Loading mentions Retweet
Filed under  //   future   startup   travel  
Posted February 14, 2010
// 0 Comments

Reflections on start-up life: Week 12

I'm not sure when the official "it's been three months is" but this feels pretty close - 12 weeks in.

Last week was a big one.  After three months of trialling and experimenting with different ideas, we packed up our plans and put them on show to potential investors.

The goal of our Sydney visit was threefold:

  1. Gain an understanding of the investment market in Australia by talking with Angels and VCs.
  2. Test some of our thoughts on target markets by gaining feedback from investors.  Is our problem "big enough"?
  3. Make connections and contacts.
I think the best analogy is that we knew we had a house of cards, let's expose it to the elements and see how it stands up.  After something like 11 meetings, several rounds of opinions and modifications to the business model as we went through the week, we reached a point where we now know how our house of cards holds up - some of it was a lot further a long than we realised, while other parts of it were a bit shakier.

While it's always a judgement call, I think that we've shaved about four weeks off our runway by undertaking this exercise - we have some very clear and consistent advice on the areas we need to improve in and also some appreciation for the things we've done well.

So how did it go? The outcome of meetings ranged everywhere between "I think you guys have got something here, send me some more information and I promise you an answer by March" to "You might find a dumb dentist or doctor with money who doesn't know what they are doing that will give it to you."  Many people described what we were doing as a "feature" not a "product" and http://twendly.com was equally helpful (to those who could see a path from Twendly to a commercial offering) and a distraction (to those who couldn't make that link).

The most consistent advice that we've taken to heart is that no matter what our next step is (Organic, Angel or VC), we are best served by demonstrating a real product, aligned with a business model with a few customers to validate that it's genuine.  It's not that we didn't "know" this, but the week helped quickly evolve what we are doing to a point that we can now address this directly.

Before Sydney we were an interesting technology with many potential applications, now we have a clear(ish) vision for a product built on an interesting and differentiating technology.  Our goal is to sit down in front of potential customers as early as next week with a prototype.

Everyone we met was incredibly helpful and their willingness to call it as they see it was always appreciated (I relate the dumb dentist story not to discredit the person who said it, while disheartening at the time, it is better to face some brutal realities early on).  In particular Brian Menzies was a super star, working his network hard to get us meetings with a wide range of people who've left us better focussed and more educated than we were at the start of the week.

The other people we should thank are the team at http://mob-labs.com who gave us a home base for the week.  With hot weather and high humidity we looked forward to the time we spent in their office and also really appreciate the support and the advice they provided too.  It's great to see a community of tech companies willing to help each other out.  Thanks Alex and Rob for your great advice, friendly support, cool air-conditioning and free internet.

Highlights?

  • At least one serious expression of interest in what we are doing.
  • All the support and advice from people willing to give up their time to help us.
  • Setting goals for this week.
  • Walking back one evening across the harbour bridge and getting caught in a real thunderstorm - we get soaked through!

Lowlights?
  • The backpackers - 30C and 95% humidity outside and warmer in the room (no aircon).
  • Not really a lowlight, but I was amazed at how tiring the whole exercise was.

Goal this week?
Get the first prototype done ready to start showing real customers next week (or even this week if we go well).

Loading mentions Retweet
Filed under  //   investors   meetings   startup   sydney  
Posted February 7, 2010
// 0 Comments

Reflections on start-up life: Week 11

It's always great to look back at the goal from last week and realise you achieved it (refine and rework business pitch ready for Sydney).  It's not so great when you realise that's the only thing you've done!

I was reflecting this morning that even now, as a two person, "pre-revenue" company (the fancy way of saying broke), we have enough work to employ at least another two people and keep them productive and busy.  One would be a full-time engineer to help Alex as he's started down the design path for the real engine - between that and a half a dozen other small programming jobs (tweak some features in Twendly to validate the business model, update the BinaryPlex website), they would stay very busy.

The second person while I'm dreaming would be a marketing / sales person.  They'd be busy researching the market, doing competitor analysis (look, progress - we have competitors now and we even know who they are!), helping out with pitch slides, and starting to utilize their extensive knowledge and contacts to find us some early sales.

As you might gather, there is still only the two of us and work for about four.  By Friday I began to feel that even if we hadn't "caught up", we at least had our heads above the water again.

This week is a significant one, we are taking our idea on the road to Sydney.  This has been a bit of an act of faith (but what isn't in start-up land); we picked a week, said we'd be in Sydney and started trying to get slots with different people to hear our idea.  So far the first three days are quite full now, Thursday and Friday we have some time if anyone wants to catch up and say hi.

The most significant lesson I think I'm continuing to learn and have reinforced is it's never too early.  If you're not sure your ready, go for it anyway and the feedback will get you to the next stage quicker than you otherwise would of.

Should be a lot of fun.

Highlights?
  • Our first long term commitment - we WILL still be here in April, we've booked tickets to Chirp the Twitter Developers conference.  Also our single biggest expense (beyond setting up the company) due to the travel and accommodation associated with it as well.
  • Great feedback on our pitches which helped us build a better pitch.
  • Actually spending a week on Business Development, not Product Development.
Lowlights?
  • My Nan passing away and the funeral.
  • Getting bored and fed up with countless revisions to slide decks.
Goal this week?
Make some great contacts in Sydney and really road test our idea with investors.

Loading mentions Retweet
Filed under  //   startup   time  
Posted January 31, 2010
// 0 Comments