changeset 10:383b7c497164

Added beanstalkd backend. Thanks, Daniel.
author mcroydon
date Wed, 28 Jan 2009 12:51:50 +0000
parents 3b0011cd18aa
children 586c2984188b
files LICENSE queues/backends/beanstalkd.py
diffstat 2 files changed, 71 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/LICENSE	Thu Jan 08 18:39:27 2009 +0000
+++ b/LICENSE	Wed Jan 28 12:51:50 2009 +0000
@@ -1,4 +1,4 @@
-Copyright (c) 2008-2009 Matt Croydon
+Copyright (c) 2008-2009 Matt Croydon, Daniel Lindsley
 
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
@@ -16,4 +16,4 @@
 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
\ No newline at end of file
+THE SOFTWARE.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/queues/backends/beanstalkd.py	Wed Jan 28 12:51:50 2009 +0000
@@ -0,0 +1,69 @@
+"""
+Backend for beanstalkd queue.
+
+This backend requires the beanstalkc library to be installed.
+"""
+
+from queues.backends.base import BaseQueue
+from queues import InvalidBackend, QueueException
+import os
+
+
+try:
+    import beanstalkc
+except ImportError:
+    raise InvalidBackend("Unable to import the beanstalkc library.")
+
+try:
+    from django.conf import settings
+    CONN = getattr(settings, 'QUEUE_BEANSTALKD_CONNECTION', None)
+except:
+    CONN = os.environ.get('QUEUE_BEANSTALKD_CONNECTION', None)
+
+if not CONN:
+    raise InvalidBackend("QUEUE_BEANSTALKD_CONNECTION not set.")
+
+
+class Queue(BaseQueue):
+    def __init__(self, name='default'):
+        host, port = CONN.split(':')
+        self._connection = beanstalkc.Connection(host=host, port=int(port))
+        self.backend = 'beanstalkd'
+        self.name = name
+        self._connection.use(name)
+
+    def read(self):
+        try:
+            job = self._connection.reserve()
+            message = job.body
+            job.delete()
+            return message
+        except (beanstalkc.DeadlineSoon, beanstalkc.CommandFailed, beanstalkc.UnexpectedResponse), e:
+            raise QueueException, e
+
+    def write(self, message):
+        try:
+            return self._connection.put(message)
+        except (beanstalkc.CommandFailed, beanstalkc.UnexpectedResponse), e:
+            raise QueueException, e
+
+    def __len__(self):
+        try:
+            return int(self._connection.stats().get('current-jobs-ready', 0))
+        except (beanstalkc.CommandFailed, beanstalkc.UnexpectedResponse), e:
+            raise QueueException, e
+
+    def __repr__(self):
+        return "<Queue %s>" % self.name
+
+def create_queue():
+    """This isn't required, so we noop.  Kept here for swapability."""
+    return True
+
+def delete_queue(name):
+    """Beanstalkd backends don't provide a way to do this."""
+    raise NotImplementedError
+
+def get_list():
+    """Beanstalkd backends don't provide a way to do this."""
+    raise NotImplementedError