Mercurial > public > queues
changeset 10:383b7c497164
Added beanstalkd backend. Thanks, Daniel.
author | mcroydon |
---|---|
date | Wed, 28 Jan 2009 12:51:50 +0000 |
parents | 3b0011cd18aa |
children | 586c2984188b |
files | LICENSE queues/backends/beanstalkd.py |
diffstat | 2 files changed, 71 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/LICENSE Thu Jan 08 18:39:27 2009 +0000 +++ b/LICENSE Wed Jan 28 12:51:50 2009 +0000 @@ -1,4 +1,4 @@ -Copyright (c) 2008-2009 Matt Croydon +Copyright (c) 2008-2009 Matt Croydon, Daniel Lindsley Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -16,4 +16,4 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file +THE SOFTWARE.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/beanstalkd.py Wed Jan 28 12:51:50 2009 +0000 @@ -0,0 +1,69 @@ +""" +Backend for beanstalkd queue. + +This backend requires the beanstalkc library to be installed. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os + + +try: + import beanstalkc +except ImportError: + raise InvalidBackend("Unable to import the beanstalkc library.") + +try: + from django.conf import settings + CONN = getattr(settings, 'QUEUE_BEANSTALKD_CONNECTION', None) +except: + CONN = os.environ.get('QUEUE_BEANSTALKD_CONNECTION', None) + +if not CONN: + raise InvalidBackend("QUEUE_BEANSTALKD_CONNECTION not set.") + + +class Queue(BaseQueue): + def __init__(self, name='default'): + host, port = CONN.split(':') + self._connection = beanstalkc.Connection(host=host, port=int(port)) + self.backend = 'beanstalkd' + self.name = name + self._connection.use(name) + + def read(self): + try: + job = self._connection.reserve() + message = job.body + job.delete() + return message + except (beanstalkc.DeadlineSoon, beanstalkc.CommandFailed, beanstalkc.UnexpectedResponse), e: + raise QueueException, e + + def write(self, message): + try: + return self._connection.put(message) + except (beanstalkc.CommandFailed, beanstalkc.UnexpectedResponse), e: + raise QueueException, e + + def __len__(self): + try: + return int(self._connection.stats().get('current-jobs-ready', 0)) + except (beanstalkc.CommandFailed, beanstalkc.UnexpectedResponse), e: + raise QueueException, e + + def __repr__(self): + return "<Queue %s>" % self.name + +def create_queue(): + """This isn't required, so we noop. Kept here for swapability.""" + return True + +def delete_queue(name): + """Beanstalkd backends don't provide a way to do this.""" + raise NotImplementedError + +def get_list(): + """Beanstalkd backends don't provide a way to do this.""" + raise NotImplementedError