Mercurial > public > queues
annotate test/memcached.py @ 1:32222d11961f
Initial commit with two tested backends: memcached protocol and Amazon SQS. Because of the non-guaranteed nature of SQS queues, some tests may fail even though the library is working properly.
author | mcroydon |
---|---|
date | Thu, 08 Jan 2009 07:49:35 +0000 (2009-01-08) |
parents | |
children |
rev | line source |
---|---|
mcroydon@1 | 1 """ |
mcroydon@1 | 2 Test basic queue functionality |
mcroydon@1 | 3 |
mcroydon@1 | 4 >>> from queues import queues |
mcroydon@1 | 5 >>> import datetime |
mcroydon@1 | 6 >>> queue_name = 'test_queues_%s' % datetime.datetime.now().isoformat() |
mcroydon@1 | 7 |
mcroydon@1 | 8 Verify that the queue does not exist |
mcroydon@1 | 9 >>> queue_name in queues.get_list() |
mcroydon@1 | 10 False |
mcroydon@1 | 11 |
mcroydon@1 | 12 Create the queue |
mcroydon@1 | 13 >>> q = queues.Queue(queue_name) |
mcroydon@1 | 14 |
mcroydon@1 | 15 Write to the queue |
mcroydon@1 | 16 >>> q.write('test') |
mcroydon@1 | 17 True |
mcroydon@1 | 18 |
mcroydon@1 | 19 Verify that it is indeed in the list |
mcroydon@1 | 20 >>> queue_name in queues.get_list() |
mcroydon@1 | 21 True |
mcroydon@1 | 22 |
mcroydon@1 | 23 Get the length of the queue |
mcroydon@1 | 24 >>> len(q) |
mcroydon@1 | 25 1 |
mcroydon@1 | 26 |
mcroydon@1 | 27 Read from the queue |
mcroydon@1 | 28 >>> q.read() |
mcroydon@1 | 29 'test' |
mcroydon@1 | 30 |
mcroydon@1 | 31 The queue should now be empty |
mcroydon@1 | 32 >>> len(q) |
mcroydon@1 | 33 0 |
mcroydon@1 | 34 |
mcroydon@1 | 35 TODO: get rid of the queue? |
mcroydon@1 | 36 """ |
mcroydon@1 | 37 |
mcroydon@1 | 38 if __name__ == "__main__": |
mcroydon@1 | 39 import doctest |
mcroydon@1 | 40 doctest.testmod() |