Mercurial > public > queues
comparison test.py @ 1:32222d11961f
Initial commit with two tested backends: memcached protocol and Amazon SQS. Because of the non-guaranteed nature of SQS queues, some tests may fail even though the library is working properly.
author | mcroydon |
---|---|
date | Thu, 08 Jan 2009 07:49:35 +0000 |
parents | |
children | a0d3e275c885 |
comparison
equal
deleted
inserted
replaced
0:17e60d0dfb44 | 1:32222d11961f |
---|---|
1 """ | |
2 Test basic queue functionality | |
3 | |
4 >>> from queues import queues | |
5 >>> import time | |
6 >>> queue_name = 'test_queues_%.f' % time.time() | |
7 | |
8 Verify that the queue does not exist | |
9 >>> queue_name in queues.get_list() | |
10 False | |
11 | |
12 Create the queue | |
13 >>> q = queues.Queue(queue_name) | |
14 | |
15 Write to the queue | |
16 >>> q.write('test') | |
17 True | |
18 | |
19 Verify that it is indeed in the list | |
20 >>> queue_name in queues.get_list() | |
21 True | |
22 | |
23 Get the length of the queue | |
24 | |
25 Note that SQS doesn't guarantee that the message | |
26 we just wrote will be immediately available | |
27 >>> len(q) | |
28 1 | |
29 | |
30 Read from the queue | |
31 >>> q.read() | |
32 'test' | |
33 | |
34 The queue should now be empty | |
35 Note that SQS doesn't guarantee an accurate count | |
36 >>> len(q) | |
37 0 | |
38 | |
39 >>> try: | |
40 ... queues.delete_queue(queue_name) | |
41 ... except NotImplementedError: | |
42 ... print True | |
43 True | |
44 """ | |
45 | |
46 if __name__ == "__main__": | |
47 import doctest | |
48 doctest.testmod() |