changeset 32:c70414fbc552

Perform blocking read when possible
author btimby
date Thu, 26 Apr 2012 14:49:46 +0000 (2012-04-26)
parents 06f1c0a8557d
children 22831bdce9fd
files queues/backends/dummy.py queues/backends/memcached.py queues/backends/redisd.py queues/backends/sqs.py
diffstat 4 files changed, 21 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/queues/backends/dummy.py	Sun Feb 05 03:30:23 2012 +0000
+++ b/queues/backends/dummy.py	Thu Apr 26 14:49:46 2012 +0000
@@ -20,9 +20,9 @@
         self.backend = 'dummy'
         self.name = name
 
-    def read(self):
+    def read(self, block=False):
         try:
-            message = self.queue.get(block=False)
+            message = self.queue.get(block=block)
             self.queue.task_done()
             return message
         except queue.Empty, e:
--- a/queues/backends/memcached.py	Sun Feb 05 03:30:23 2012 +0000
+++ b/queues/backends/memcached.py	Thu Apr 26 14:49:46 2012 +0000
@@ -9,13 +9,9 @@
 import os, re
 
 try:
-    from cmemcache import Client
-    
-except ImportError:
-    try:
-        from memcache import Client
-    except:
-        raise InvalidBackend("Unable to import a memcache library.")
+    import memcache
+except:
+    raise InvalidBackend("Unable to import a memcache library.")
 
 try:
     from django.conf import settings
@@ -27,13 +23,14 @@
     raise InvalidBackend("QUEUE_MEMCACHE_CONNECTION not set.")
 
 class Queue(BaseQueue):
-    
     def __init__(self, name):
-        self._connection = Client(CONN.split(';'))
+        self._connection = memcache.Client(CONN.split(';'))
         self.backend = 'memcached'
         self.name = name
 
-    def read(self):
+    def read(self, block=False):
+        if block:
+            raise NotImplemented('Memcached cannot perform a blocking read.')
         try:
             return self._connection.get(self.name)
         except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e:
--- a/queues/backends/redisd.py	Sun Feb 05 03:30:23 2012 +0000
+++ b/queues/backends/redisd.py	Thu Apr 26 14:49:46 2012 +0000
@@ -64,11 +64,16 @@
         except redis.RedisError, e:
             raise QueueException, "%s" % e
 
-    def read(self):
+    def read(self, block=False):
         try:
-            return self._connection.lpop(self.name)
+            if block:
+                m = self._connection.blpop(self.name)
+            else:
+                m = self._connection.lpop(self.name)
+            if m is None:
+                raise QueueException('Queue is empty')
         except redis.RedisError, e:
-            raise QueueException, "%s" % e
+            raise QueueException(str(e))
 
     def write(self, value):
         try:
--- a/queues/backends/sqs.py	Sun Feb 05 03:30:23 2012 +0000
+++ b/queues/backends/sqs.py	Thu Apr 26 14:49:46 2012 +0000
@@ -40,10 +40,12 @@
         if not self._queue:
             self._queue = self._connection.create_queue(name)
 
-    def read(self):
+    def read(self, block=False):
+        if block:
+            raise NotImplemented('SQS cannot performing a blocking read.')
         try:
             m = self._queue.read()
-            if not m:
+            if m is None:
                 return None
             else:
                 self._queue.delete()