# HG changeset patch # User mcroydon # Date 1242847514 0 # Node ID e09cc844ff1004b028fdb8ea38a8ccc9a52fc977 # Parent 59c8c47bcf125f9f7c050c0bb7d31a9f338db698 Added zenqueue backend as zenqueued based heavily on a patch by Daniel Lindsley. To use the zenqueued backend you'll need to set QUEUE_BACKEND to zenqueued, and also set QUEUE_ZENQUEUE_CONNECTION. The zenqueued backend defaults to using the client for HTTP but you can use either the HTTP or native method by setting QUEUE_ZENQUEUE_METHOD to http or native. Note that zenqueue does not allow for multiple named queues, so the name argument to the Queue constructor is ignored. Several other options (__len__, delete_queue, and get_list) are not supported by zenqueue. diff -r 59c8c47bcf12 -r e09cc844ff10 queues/backends/zenqueued.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/zenqueued.py Wed May 20 19:25:14 2009 +0000 @@ -0,0 +1,73 @@ +""" +Backend for zenqueue queue. + +This backend requires the zenqueue library to be installed. It uses the +HTTP connection to a zenqueue server and the async bits. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os + + +try: + from zenqueue.client import QueueClient + # from zenqueue.queue.Queue import Timeout +except ImportError: + raise InvalidBackend("Unable to import the zenqueue library.") + +try: + from django.conf import settings + CONN = getattr(settings, 'QUEUE_ZENQUEUE_CONNECTION', None) + METHOD = getattr(settings, 'QUEUE_ZENQUEUE_METHOD', 'http') +except: + CONN = os.environ.get('QUEUE_ZENQUEUE_CONNECTION', None) + METHOD = os.environ.get('QUEUE_ZENQUEUE_METHOD', 'http') + +if not CONN: + raise InvalidBackend("QUEUE_ZENQUEUE_CONNECTION not set.") + +try: + host, port = CONN.split(':') +except ValueError: + raise InvalidBackend("QUEUE_ZENQUEUE_CONNECTION should be in the format host:port (such as localhost:3000).") + +class Queue(BaseQueue): + def __init__(self, name='default'): + self._connection = QueueClient(method=METHOD, host=host, port=int(port), mode='async') + self.backend = 'zenqueued' + self.name = name + + def read(self): + try: + message = self._connection.pull() + return message + except Exception, e: + raise QueueException, e + + def write(self, message): + try: + self._connection.push(message) + # push only exposes operation success/fail as a DEBUG logging message + return True + except Exception, e: + raise QueueException, e + + def __len__(self): + """zenqueue backends don't provide a way to do this.""" + raise NotImplementedError + + def __repr__(self): + return "" % self.name + +def create_queue(): + """This isn't required, so we noop. Kept here for swapability.""" + return True + +def delete_queue(name): + """zenqueue backends don't provide a way to do this.""" + raise NotImplementedError + +def get_list(): + """zenqueue backends don't provide a way to do this.""" + raise NotImplementedError