mcroydon@1: """ mcroydon@1: Test basic queue functionality mcroydon@1: mcroydon@1: >>> from queues import queues mcroydon@1: >>> import time mcroydon@1: >>> queue_name = 'test_queues_%.f' % time.time() mcroydon@1: mcroydon@1: Verify that the queue does not exist mcroydon@1: >>> queue_name in queues.get_list() mcroydon@1: False mcroydon@1: mcroydon@1: Create the queue mcroydon@1: >>> q = queues.Queue(queue_name) mcroydon@1: mcroydon@1: Write to the queue mcroydon@1: >>> q.write('test') mcroydon@1: True mcroydon@1: mcroydon@1: Verify that it is indeed in the list mcroydon@1: >>> queue_name in queues.get_list() mcroydon@1: True mcroydon@1: mcroydon@1: Get the length of the queue mcroydon@1: mcroydon@1: Note that SQS doesn't guarantee that the message mcroydon@1: we just wrote will be immediately available mcroydon@1: >>> len(q) mcroydon@1: 1 mcroydon@1: mcroydon@1: Read from the queue mcroydon@17: >>> unicode(q.read()) mcroydon@17: u'test' mcroydon@1: mcroydon@1: The queue should now be empty mcroydon@1: Note that SQS doesn't guarantee an accurate count mcroydon@1: >>> len(q) mcroydon@1: 0 mcroydon@1: mcroydon@1: >>> try: mcroydon@1: ... queues.delete_queue(queue_name) mcroydon@1: ... except NotImplementedError: mcroydon@1: ... print True mcroydon@1: True mcroydon@1: """ mcroydon@1: mcroydon@1: if __name__ == "__main__": mcroydon@1: import doctest mcroydon@1: doctest.testmod()