Mercurial > public > queues
annotate test.py @ 33:22831bdce9fd
Added timeouts for blocking mode
author | btimby |
---|---|
date | Mon, 14 May 2012 19:22:57 +0000 |
parents | a0d3e275c885 |
children |
rev | line source |
---|---|
mcroydon@1 | 1 """ |
mcroydon@1 | 2 Test basic queue functionality |
mcroydon@1 | 3 |
mcroydon@1 | 4 >>> from queues import queues |
mcroydon@1 | 5 >>> import time |
mcroydon@1 | 6 >>> queue_name = 'test_queues_%.f' % time.time() |
mcroydon@1 | 7 |
mcroydon@1 | 8 Verify that the queue does not exist |
mcroydon@1 | 9 >>> queue_name in queues.get_list() |
mcroydon@1 | 10 False |
mcroydon@1 | 11 |
mcroydon@1 | 12 Create the queue |
mcroydon@1 | 13 >>> q = queues.Queue(queue_name) |
mcroydon@1 | 14 |
mcroydon@1 | 15 Write to the queue |
mcroydon@1 | 16 >>> q.write('test') |
mcroydon@1 | 17 True |
mcroydon@1 | 18 |
mcroydon@1 | 19 Verify that it is indeed in the list |
mcroydon@1 | 20 >>> queue_name in queues.get_list() |
mcroydon@1 | 21 True |
mcroydon@1 | 22 |
mcroydon@1 | 23 Get the length of the queue |
mcroydon@1 | 24 |
mcroydon@1 | 25 Note that SQS doesn't guarantee that the message |
mcroydon@1 | 26 we just wrote will be immediately available |
mcroydon@1 | 27 >>> len(q) |
mcroydon@1 | 28 1 |
mcroydon@1 | 29 |
mcroydon@1 | 30 Read from the queue |
mcroydon@17 | 31 >>> unicode(q.read()) |
mcroydon@17 | 32 u'test' |
mcroydon@1 | 33 |
mcroydon@1 | 34 The queue should now be empty |
mcroydon@1 | 35 Note that SQS doesn't guarantee an accurate count |
mcroydon@1 | 36 >>> len(q) |
mcroydon@1 | 37 0 |
mcroydon@1 | 38 |
mcroydon@1 | 39 >>> try: |
mcroydon@1 | 40 ... queues.delete_queue(queue_name) |
mcroydon@1 | 41 ... except NotImplementedError: |
mcroydon@1 | 42 ... print True |
mcroydon@1 | 43 True |
mcroydon@1 | 44 """ |
mcroydon@1 | 45 |
mcroydon@1 | 46 if __name__ == "__main__": |
mcroydon@1 | 47 import doctest |
mcroydon@1 | 48 doctest.testmod() |