# HG changeset patch # User mcroydon # Date 1246474870 0 # Node ID 4473ee0eff6f8a38c16243c1f0bef9e0deb07ab2 # Parent 1358e3314c88a5e74af7508a984b5f9873ce36e9 Added dummy backend that uses Python's built-in Queue class, very useful for testing. Fixes #3. Thanks, Daniel. diff -r 1358e3314c88 -r 4473ee0eff6f queues/backends/dummy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/dummy.py Wed Jul 01 19:01:10 2009 +0000 @@ -0,0 +1,58 @@ +""" +A dummy queue that uses Python's built-in Queue class. + +Useful for testing but likely not much else. +""" +import Queue as queue +from queues import QueueException +from queues.backends.base import BaseQueue + + +dummy_queue = queue.Queue() + + +class Queue(BaseQueue): + def __init__(self, name='default'): + self.queue = dummy_queue + self.backend = 'dummy' + self.name = name + + def read(self): + try: + message = self.queue.get(block=False) + self.queue.task_done() + return message + except queue.Empty, e: + raise QueueException("The queue is empty.") + + def write(self, message): + try: + return self.queue.put(message) + except queue.Full, e: + raise QueueException("The queue is full.") + + def __len__(self): + return self.queue.qsize() + + def __repr__(self): + return "" % self.name + + +def create_queue(): + """This isn't required, so we noop. Kept here for swapability.""" + return True + + +def delete_queue(name): + """Just start afresh.""" + try: + while True: + dummy_queue.get(block=False) + except queue.Empty: + # Yay, exhausted. + pass + + +def get_list(): + """No way to do this.""" + raise NotImplementedError