changeset 13:e09cc844ff10

Added zenqueue backend as zenqueued based heavily on a patch by Daniel Lindsley. To use the zenqueued backend you'll need to set QUEUE_BACKEND to zenqueued, and also set QUEUE_ZENQUEUE_CONNECTION. The zenqueued backend defaults to using the client for HTTP but you can use either the HTTP or native method by setting QUEUE_ZENQUEUE_METHOD to http or native. Note that zenqueue does not allow for multiple named queues, so the name argument to the Queue constructor is ignored. Several other options (__len__, delete_queue, and get_list) are not supported by zenqueue.
author mcroydon
date Wed, 20 May 2009 19:25:14 +0000
parents 59c8c47bcf12
children 1358e3314c88
files queues/backends/zenqueued.py
diffstat 1 files changed, 73 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/queues/backends/zenqueued.py	Wed May 20 19:25:14 2009 +0000
@@ -0,0 +1,73 @@
+"""
+Backend for zenqueue queue.
+
+This backend requires the zenqueue library to be installed. It uses the
+HTTP connection to a zenqueue server and the async bits.
+"""
+
+from queues.backends.base import BaseQueue
+from queues import InvalidBackend, QueueException
+import os
+
+
+try:
+    from zenqueue.client import QueueClient
+    # from zenqueue.queue.Queue import Timeout
+except ImportError:
+    raise InvalidBackend("Unable to import the zenqueue library.")
+
+try:
+    from django.conf import settings
+    CONN = getattr(settings, 'QUEUE_ZENQUEUE_CONNECTION', None)
+    METHOD = getattr(settings, 'QUEUE_ZENQUEUE_METHOD', 'http')
+except:
+    CONN = os.environ.get('QUEUE_ZENQUEUE_CONNECTION', None)
+    METHOD = os.environ.get('QUEUE_ZENQUEUE_METHOD', 'http')
+
+if not CONN:
+    raise InvalidBackend("QUEUE_ZENQUEUE_CONNECTION not set.")
+
+try:
+    host, port = CONN.split(':')
+except ValueError:
+    raise InvalidBackend("QUEUE_ZENQUEUE_CONNECTION should be in the format host:port (such as localhost:3000).")
+
+class Queue(BaseQueue):
+    def __init__(self, name='default'):
+        self._connection = QueueClient(method=METHOD, host=host, port=int(port), mode='async')
+        self.backend = 'zenqueued'
+        self.name = name
+
+    def read(self):
+        try:
+            message = self._connection.pull()
+            return message
+        except Exception, e:
+            raise QueueException, e
+
+    def write(self, message):
+        try:
+            self._connection.push(message)
+            # push only exposes operation success/fail as a DEBUG logging message
+            return True
+        except Exception, e:
+            raise QueueException, e
+
+    def __len__(self):
+        """zenqueue backends don't provide a way to do this."""
+        raise NotImplementedError
+
+    def __repr__(self):
+        return "<Queue %s>" % self.name
+
+def create_queue():
+    """This isn't required, so we noop.  Kept here for swapability."""
+    return True
+
+def delete_queue(name):
+    """zenqueue backends don't provide a way to do this."""
+    raise NotImplementedError
+
+def get_list():
+    """zenqueue backends don't provide a way to do this."""
+    raise NotImplementedError