annotate test.py @ 33:22831bdce9fd

Added timeouts for blocking mode
author btimby
date Mon, 14 May 2012 19:22:57 +0000
parents a0d3e275c885
children
rev   line source
mcroydon@1 1 """
mcroydon@1 2 Test basic queue functionality
mcroydon@1 3
mcroydon@1 4 >>> from queues import queues
mcroydon@1 5 >>> import time
mcroydon@1 6 >>> queue_name = 'test_queues_%.f' % time.time()
mcroydon@1 7
mcroydon@1 8 Verify that the queue does not exist
mcroydon@1 9 >>> queue_name in queues.get_list()
mcroydon@1 10 False
mcroydon@1 11
mcroydon@1 12 Create the queue
mcroydon@1 13 >>> q = queues.Queue(queue_name)
mcroydon@1 14
mcroydon@1 15 Write to the queue
mcroydon@1 16 >>> q.write('test')
mcroydon@1 17 True
mcroydon@1 18
mcroydon@1 19 Verify that it is indeed in the list
mcroydon@1 20 >>> queue_name in queues.get_list()
mcroydon@1 21 True
mcroydon@1 22
mcroydon@1 23 Get the length of the queue
mcroydon@1 24
mcroydon@1 25 Note that SQS doesn't guarantee that the message
mcroydon@1 26 we just wrote will be immediately available
mcroydon@1 27 >>> len(q)
mcroydon@1 28 1
mcroydon@1 29
mcroydon@1 30 Read from the queue
mcroydon@17 31 >>> unicode(q.read())
mcroydon@17 32 u'test'
mcroydon@1 33
mcroydon@1 34 The queue should now be empty
mcroydon@1 35 Note that SQS doesn't guarantee an accurate count
mcroydon@1 36 >>> len(q)
mcroydon@1 37 0
mcroydon@1 38
mcroydon@1 39 >>> try:
mcroydon@1 40 ... queues.delete_queue(queue_name)
mcroydon@1 41 ... except NotImplementedError:
mcroydon@1 42 ... print True
mcroydon@1 43 True
mcroydon@1 44 """
mcroydon@1 45
mcroydon@1 46 if __name__ == "__main__":
mcroydon@1 47 import doctest
mcroydon@1 48 doctest.testmod()