Mercurial > public > queues
changeset 32:c70414fbc552
Perform blocking read when possible
author | btimby |
---|---|
date | Thu, 26 Apr 2012 14:49:46 +0000 |
parents | 06f1c0a8557d |
children | 22831bdce9fd |
files | queues/backends/dummy.py queues/backends/memcached.py queues/backends/redisd.py queues/backends/sqs.py |
diffstat | 4 files changed, 21 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/queues/backends/dummy.py Sun Feb 05 03:30:23 2012 +0000 +++ b/queues/backends/dummy.py Thu Apr 26 14:49:46 2012 +0000 @@ -20,9 +20,9 @@ self.backend = 'dummy' self.name = name - def read(self): + def read(self, block=False): try: - message = self.queue.get(block=False) + message = self.queue.get(block=block) self.queue.task_done() return message except queue.Empty, e:
--- a/queues/backends/memcached.py Sun Feb 05 03:30:23 2012 +0000 +++ b/queues/backends/memcached.py Thu Apr 26 14:49:46 2012 +0000 @@ -9,13 +9,9 @@ import os, re try: - from cmemcache import Client - -except ImportError: - try: - from memcache import Client - except: - raise InvalidBackend("Unable to import a memcache library.") + import memcache +except: + raise InvalidBackend("Unable to import a memcache library.") try: from django.conf import settings @@ -27,13 +23,14 @@ raise InvalidBackend("QUEUE_MEMCACHE_CONNECTION not set.") class Queue(BaseQueue): - def __init__(self, name): - self._connection = Client(CONN.split(';')) + self._connection = memcache.Client(CONN.split(';')) self.backend = 'memcached' self.name = name - def read(self): + def read(self, block=False): + if block: + raise NotImplemented('Memcached cannot perform a blocking read.') try: return self._connection.get(self.name) except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e:
--- a/queues/backends/redisd.py Sun Feb 05 03:30:23 2012 +0000 +++ b/queues/backends/redisd.py Thu Apr 26 14:49:46 2012 +0000 @@ -64,11 +64,16 @@ except redis.RedisError, e: raise QueueException, "%s" % e - def read(self): + def read(self, block=False): try: - return self._connection.lpop(self.name) + if block: + m = self._connection.blpop(self.name) + else: + m = self._connection.lpop(self.name) + if m is None: + raise QueueException('Queue is empty') except redis.RedisError, e: - raise QueueException, "%s" % e + raise QueueException(str(e)) def write(self, value): try:
--- a/queues/backends/sqs.py Sun Feb 05 03:30:23 2012 +0000 +++ b/queues/backends/sqs.py Thu Apr 26 14:49:46 2012 +0000 @@ -40,10 +40,12 @@ if not self._queue: self._queue = self._connection.create_queue(name) - def read(self): + def read(self, block=False): + if block: + raise NotImplemented('SQS cannot performing a blocking read.') try: m = self._queue.read() - if not m: + if m is None: return None else: self._queue.delete()