Mercurial > public > queues
changeset 13:e09cc844ff10
Added zenqueue backend as zenqueued based heavily on a patch by Daniel Lindsley. To use the zenqueued backend you'll need to set QUEUE_BACKEND to zenqueued, and also set QUEUE_ZENQUEUE_CONNECTION. The zenqueued backend defaults to using the client for HTTP but you can use either the HTTP or native method by setting QUEUE_ZENQUEUE_METHOD to http or native. Note that zenqueue does not allow for multiple named queues, so the name argument to the Queue constructor is ignored. Several other options (__len__, delete_queue, and get_list) are not supported by zenqueue.
author | mcroydon |
---|---|
date | Wed, 20 May 2009 19:25:14 +0000 |
parents | 59c8c47bcf12 |
children | 1358e3314c88 |
files | queues/backends/zenqueued.py |
diffstat | 1 files changed, 73 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/zenqueued.py Wed May 20 19:25:14 2009 +0000 @@ -0,0 +1,73 @@ +""" +Backend for zenqueue queue. + +This backend requires the zenqueue library to be installed. It uses the +HTTP connection to a zenqueue server and the async bits. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os + + +try: + from zenqueue.client import QueueClient + # from zenqueue.queue.Queue import Timeout +except ImportError: + raise InvalidBackend("Unable to import the zenqueue library.") + +try: + from django.conf import settings + CONN = getattr(settings, 'QUEUE_ZENQUEUE_CONNECTION', None) + METHOD = getattr(settings, 'QUEUE_ZENQUEUE_METHOD', 'http') +except: + CONN = os.environ.get('QUEUE_ZENQUEUE_CONNECTION', None) + METHOD = os.environ.get('QUEUE_ZENQUEUE_METHOD', 'http') + +if not CONN: + raise InvalidBackend("QUEUE_ZENQUEUE_CONNECTION not set.") + +try: + host, port = CONN.split(':') +except ValueError: + raise InvalidBackend("QUEUE_ZENQUEUE_CONNECTION should be in the format host:port (such as localhost:3000).") + +class Queue(BaseQueue): + def __init__(self, name='default'): + self._connection = QueueClient(method=METHOD, host=host, port=int(port), mode='async') + self.backend = 'zenqueued' + self.name = name + + def read(self): + try: + message = self._connection.pull() + return message + except Exception, e: + raise QueueException, e + + def write(self, message): + try: + self._connection.push(message) + # push only exposes operation success/fail as a DEBUG logging message + return True + except Exception, e: + raise QueueException, e + + def __len__(self): + """zenqueue backends don't provide a way to do this.""" + raise NotImplementedError + + def __repr__(self): + return "<Queue %s>" % self.name + +def create_queue(): + """This isn't required, so we noop. Kept here for swapability.""" + return True + +def delete_queue(name): + """zenqueue backends don't provide a way to do this.""" + raise NotImplementedError + +def get_list(): + """zenqueue backends don't provide a way to do this.""" + raise NotImplementedError