Tuesday 26 July 2011

Asynchronous GNU Readline

I've been playing around with async server tools in Python, writing a memcache clone in a couple of different ways. One version uses 0MQ REP/REQ sockets for the protocol and another version tries to clone the actual memcache protocol using asynchat/asyncore. For another project I wrote a command shell to use for testing purposes, and to exercise the API and peek into internals. However, that command shell was running standalone, not part of an application. Generally, if you write a command shell you will want to use GNU readline for input because things like up-arrow, line editing and Ctrl-R search make life simpler.

Unfortunately the Python library that wraps GNU readline is blocking, therefore it won't work in an async server. But, readline does have an async API as well, so I set about investigating how to use it from Python. There seemed to be two choices. First was to write a C module that wraps the async features, and second was to use ctypes and call libreadline.so directly. Of course I googled a bit to see if anyone had done it and that is when I learned about ctypesgen. This is a nice little tool which takes a library and its include files, and spits out a Python module using ctypes that enables a Python application to use the same API as a C program would. ctypesgen: A Pure Python Wrapper Generator for ctypes.

So I tried it out like so:

python ctgen/ctypesgen.py -lreadline /usr/include/readline/*.h -o ctreadline.py

The end result was ctreadline.py, a Python module that was all ready for use. It only took a short while to read the libreadline docs and knock together this simple test program


import ctreadline
import select
import sys
import atexit


runEnabled = True
nullstr = ctreadline.POINTER(ctreadline.c_char)()


def exitCleanup():
    ctreadline.rl_callback_handler_remove()
atexit.register(exitCleanup)


def cb(ln):
    global runEnabled


    # you must use == in this comparison because ln is a C object
    if ln == None:
        runEnabled = False
        ctreadline.rl_set_prompt("")
    elif len(ln) > 0:
        ctreadline.add_history(ln)
        print ln


ctreadline.rl_callback_handler_install("async>> ",ctreadline.rl_vcpfunc_t(cb))
while runEnabled:
    select.select([sys.stdin],[],[],0.001)
    ctreadline.rl_callback_read_char()
print " "

It doesn't do much, just echo back what you type, but it does do it asynchronously using "select" so it will be pretty straightforward to integrate in a command shell program and any async server based on select. Don't forget to try out your favourite readline features when you run it, things like Ctrl-R to search back, up-arrow and line editing.

The thing that took the longest to figure out was that you cannot compare the return value from libreadline in the usual way. I generally write if ln is None: but ctypes seems to return a different instance of None so you need to use the equals signs.

It can be hard to track down an API reference for libreadline and I ended up using the one for DOS here on Delorie's site.

Saturday 16 July 2011

AMQP and Python

I've spent the last 4 months writing Python code for a system that polls a datasource for changes every minute, then feeds messages into an AMQP message queue. Then worker processes that are listening to the queue, pick up a message, process it, and go to the next message. These processes run forever (unless they crash or are killed). When I took on the task, there was a basic prototype that was basically a sort of finite state machine, so when I planned out my version, I was thinking state machines. As a result, I mapped out several states that a job had to pass through and wrote a process to handle each step. The state transitions were basically handled by a set of AMQP message queues so that when a process finished a message like "Record 272727 changed" they passed the same message onto another queue that was listened to by another process. Overall this was just an ETL application that Extracted data from a database server, Transformed it, and Loaded it into another database for the SOLR search engine. Aside from the unusual destination database it was not that different from any other Extract/Transform/Load application.

For AMQP we had already decided to use RabbitMQ but we had only installed an old version which is included in Ubuntu Linux. This old version did not support the management plugin and there were difficulties in getting our operations folks to accept a non-Ubuntu service package with a newer RabbitMQ, so I was looking for alternate ways to get access to RabbitMQ information. Since it was written in Erlang, I read up on what Erlang is, and the actor model of multiprocess computing that it implements. I realized that I could run code on another Erlang node, on the same machine or not, and talk to RabbitMQ as long as I had the cookie for the RabbitMQ server. I was able to write some simple code to talk to Rabbit and get its status, but getting data about a Queue was harder because inside Rabbit, each queue is managed by a process with no name. I haven't yet gotten to the bottom of it, but I did do some experimenting with a Python library called PyInterface that allows a Python program to emulate an Erlang node and interact with RabbitMQ.

Along the way, I realized that the architecture I had chosen for this Python was rather like the Erlang actor model, and that I really should have a supervisor process to manage all my workers, restart them if they hang or crash, and perhaps even manage the number of instances of a process. At this point, the supervisor (written in Python) just starts one of each worker except for a couple of bottlenecks where it starts 15 of one and 4 of another. When I crack the problem of interfacing Python to RabbitMQ, I will be able to monitor queue backlogs over time and increase/decrease the number of instances of workers on a particular queue. This will bring it closer to being a system that just runs forever and heals itself.

Of course sometimes things go wrong, and there are three failure queues that collect messages when that happens. The workers listening to these queues delay messages for a while, then resubmit to the start of the process chain for a retry. I used JSON object format for the messages, e..g. {"recordid": "272727"}, so it was easy to add some more attributes to messages before passing them to the next queue. Messages going into a failure queue get a reason added, and before resubmitting I add a retries attribute so that I can count how many times the message has gone through and failed. If a db server goes down, a message might make two or three round trips before it is up again. And finally, if there are too many retries, I punt the messages into a queue for human attention. Over time, with lots of testing, the number of messages in that queue has gone from 50% of all messages to a tiny number.

When I started I had a prototype with half a dozen Python scripts using three different Python AMQP libraries, pika, amqplib and kombu. In evaluating them I realised that all three had shortcomings and appeared to diverge, in different ways, from the AMQP model. In the end, I decided to stick with kombu over the amqplib transport in the expectation that I could switch transports if I needed to in the future. But in writing an MQ shell program to manipulate AMQP message queues, I realised that kombu's code was overly convoluted, so I wrote a shim layer over that. More recently I have started to rewrite the shim to run directly over amqplib but that only supports AMQP 0.8.1. I would rather use AMQP 0.9.1 and recently discovered two more libraries,  pylibrabbitmq that wraps the C library, and haigha which started life as amqplib and which seems the most up to date AMQP library that adheres to the AMQP model.

In any case, I've decided to work on a complete rewrite from scratch of my MQ shell, this time using haigha for AMQP and plac (instead of cmd) for the shell framework. It will show up on my github site as soon as I have something that can publish messages to an exchange and subscribe to a queue.