Mercurial > public > queues
changeset 1:32222d11961f
Initial commit with two tested backends: memcached protocol and Amazon SQS. Because of the non-guaranteed nature of SQS queues, some tests may fail even though the library is working properly.
author | mcroydon |
---|---|
date | Thu, 08 Jan 2009 07:49:35 +0000 |
parents | 17e60d0dfb44 |
children | 4a8876e38944 |
files | LICENSE build/lib/queues/__init__.py build/lib/queues/backends/__init__.py build/lib/queues/backends/base.py build/lib/queues/backends/memcached.py build/lib/queues/backends/sqs.py queues/__init__.py queues/backends/__init__.py queues/backends/base.py queues/backends/memcached.py queues/backends/sqs.py setup.py test.py test/memcached.py |
diffstat | 14 files changed, 706 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/LICENSE Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,19 @@ +Copyright (c) 2008-2009 Matt Croydon + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/build/lib/queues/__init__.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,58 @@ +""" +A pluggable abstract queueing API designed to be used within a Django project +but useful within a general Python application too. The design is modeled +after a pluggable backend system ala django.core.cache. + +Backends that merit investigation + +x http://aws.amazon.com/ (SQS) +* http://code.google.com/p/django-queue-service/ +x https://rubyforge.org/projects/starling/ (memcache) +x http://code.google.com/p/sparrow/ (memcache) +* http://xph.us/software/beanstalkd/ (not persistent) +* http://code.google.com/p/peafowl/ (python/memcache) +* http://memcachedb.org/memcacheq/ (memcache) + +Other backends that might be worth checking out + +* http://stompserver.rubyforge.org/ +* http://www.spread.org/ +* http://code.google.com/p/stomperl/ +* RabbitMQ +""" +import os + +__version__ = "0.2" + +class InvalidBackend(Exception): + pass + +class QueueException(Exception): + pass + +# TODO: raise exceptions when stuff doesn't get stored/returned properly? +# i.e. unified API and handle what each backend returns. + +# Handle QUEUE_BACKEND set from either DJANGO_SETTINGS_MODULE or an environment variable. +# If set both places, django takes precedence. +try: + from django.conf import settings + BACKEND = settings.get('QUEUE_BACKEND', None) +except: + BACKEND = os.environ.get('QUEUE_BACKEND') + +if not BACKEND: + raise InvalidBackend("QUEUE_BACKEND not set.") + +# Set up queues.queues to point to the proper backend. +try: + # Most of the time we'll be importing a bundled backend, + # so look here first. You might recall this pattern from + # such web frameworks as Django. + queues = __import__('queues.backends.%s' % BACKEND, {}, {}, ['']) +except ImportError, e: + # If that didn't work, try an external import. + try: + queues = __import__(BACKEND, {}, {}, ['']) + except ImportError: + raise InvalidBackend("Unable to import QUEUE BACKEND '%s'" % BACKEND) \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/build/lib/queues/backends/__init__.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,27 @@ +import os +from queues import InvalidBackend + +__all__ = ['backend'] + +# Handle QUEUE_BACKEND set from either DJANGO_SETTINGS_MODULE or an environment variable. +# If set both places, django takes precedence. +try: + from django.conf import settings + BACKEND = settings.get('QUEUE_BACKEND', None) +except: + BACKEND = os.environ.get('QUEUE_BACKEND', None) + +if not BACKEND: + raise InvalidBackend("QUEUE_BACKEND not set.") + +try: + # Most of the time we'll be importing a bundled backend, + # so look here first. You might recall this pattern from + # such web frameworks as Django. + backend = __import__('queues.backends.%s' % BACKEND, {}, {}, ['']) +except ImportError, e: + # If that didn't work, try an external import. + try: + backend = __import__(BACKEND, {}, {}, ['']) + except ImportError: + raise InvalidBackend("Unable to import QUEUE BACKEND '%s'" % BACKEND)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/build/lib/queues/backends/base.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,27 @@ +"Base queue class" + +# Things to think about: +# - timeout/visibility timeout (boto) + +class BaseQueue(object): + """ + Abstract base class for queue backends. + """ + + def read(self): + raise NotImplementedError + + def write(self, message): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + +def create_queue(): + raise NotImplementedError + +def delete_queue(name): + raise NotImplementedError + +def get_list(): + raise NotImplementedError \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/build/lib/queues/backends/memcached.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,82 @@ +""" +Backend for queues that implement the memcache protocol, including starling. + +This backend requires either the memcache or cmemcache libraries to be installed. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os, re + +try: + from cmemcache import Client + +except ImportError: + try: + from memcache import Client + except: + raise InvalidBackend("Unable to import a memcache library.") + +try: + from django.conf import settings + CONN = settings.get('QUEUE_MEMCACHE_CONNECTION', None) +except: + CONN = os.environ.get('QUEUE_MEMCACHE_CONNECTION', None) + +if not CONN: + raise InvalidBackend("QUEUE_MEMCACHE_CONNECTION not set.") + +class Queue(BaseQueue): + + def __init__(self, name): + self._connection = Client(CONN.split(';')) + self.backend = 'memcached' + self.name = name + + def read(self): + try: + return self._connection.get(self.name) + except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e: + raise QueueException, e + + def write(self, message): + try: + return self._connection.set(self.name, message, 0) + except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e: + raise QueueException, e + + def __len__(self): + try: + try: + return int(self._connection.get_stats()[0][1]['queue_%s_items' % self.name]) + except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e: + raise QueueException, e + except AttributeError: + # If this memcached backend doesn't support starling-style stats + # or if this queue doesn't exist + return 0 + + def __repr__(self): + return "<Queue %s>" % self.name + +def create_queue(): + """This isn't required, so we noop. Kept here for swapability.""" + return True + +def delete_queue(name): + """Memcached backends don't provide a way to do this.""" + raise NotImplementedError + +def get_list(): + """Supports starling/peafowl-style queue_<name>_items introspection via stats.""" + conn = Client(CONN.split(';')) + queue_list = [] + queue_re = re.compile(r'queue\_(.*?)\_total_items') + try: + for server in conn.get_stats(): + for key in server[1].keys(): + if queue_re.findall(key): + queue_list.append(queue_re.findall(key)[0]) + except (KeyError, AttributeError, memcache.MemcachedKeyError, MemcachedStringEncodingError): + pass + return queue_list
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/build/lib/queues/backends/sqs.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,99 @@ +""" +Backend for Amazon's Simple Queue Service. + +This backend requires that the boto library is installed. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os + +try: + from boto.sqs.connection import SQSConnection + from boto.sqs.message import Message + from boto.exception import SQSError +except ImportError: + raise InvalidBackend("Unable to import boto.") + +try: + from django.conf import settings + KEY = settings.get('AWS_ACCESS_KEY_ID', None) + SECRET = settings.get('AWS_SECRET_ACCESS_KEY', None) +except: + KEY = os.environ.get('AWS_ACCESS_KEY_ID', None) + SECRET = os.environ.get('AWS_SECRET_ACCESS_KEY', None) + +if not KEY: + raise InvalidBackend("AWS_ACCESS_KEY_ID not set.") +if not SECRET: + raise InvalidBackend("AWS_SECRET_ACCESS_KEY not set.") + +# ... and one connection to bind them. +connection = SQSConnection() + +class Queue(BaseQueue): + def __init__(self, name): + self.name = name + self.backend = 'sqs' + self._connection = connection + self._queue = self._connection.get_queue(self.name) + if not self._queue: + self._queue = self._connection.create_queue(name) + + def read(self): + try: + m = self._queue.read() + if not m: + return None + else: + self._queue.delete() + return m.get_body() + except SQSError, e: + raise QueueException, "%s" % e.code + + def write(self, message): + try: + m = Message() + m.set_body(message) + return self._queue.write(m) + except SQSError, e: + raise QueueException, "%s" % e.code + + def __len__(self): + try: + length = self._queue.count() + if not length: + length = 0 + return int(length) + except SQSError, e: + raise QueueException, "%s" % e.code + + def __repr__(self): + return "<Queue %s>" % self.name + +def create_queue(name): + """Create a queue for the given name.""" + try: + return connection.create_queue(name) + except SQSError, e: + raise QueueException, "%s" % e.code + +def delete_queue(name): + """ + Deletes a queue and any messages in it. + """ + # TODO: too fragile. + try: + return connection.get_status('DeleteQueue', None, '/' + name) + except SQSError, e: + raise QueueException, "%s" % e.code + +def get_list(): + """ + Get a list of names for all queues. Returns a list of ``queues.backends.sqs.Queue`` objects. + """ + # TODO: too fragile. + try: + return [q.id[1:] for q in connection.get_all_queues()] + except SQSError, e: + raise QueueException, "%s" % e.code
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/__init__.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,58 @@ +""" +A pluggable abstract queueing API designed to be used within a Django project +but useful within a general Python application too. The design is modeled +after a pluggable backend system ala django.core.cache. + +Backends that merit investigation + +x http://aws.amazon.com/ (SQS) +* http://code.google.com/p/django-queue-service/ +x https://rubyforge.org/projects/starling/ (memcache) +x http://code.google.com/p/sparrow/ (memcache) +* http://xph.us/software/beanstalkd/ (not persistent) +* http://code.google.com/p/peafowl/ (python/memcache) +* http://memcachedb.org/memcacheq/ (memcache) + +Other backends that might be worth checking out + +* http://stompserver.rubyforge.org/ +* http://www.spread.org/ +* http://code.google.com/p/stomperl/ +* RabbitMQ +""" +import os + +__version__ = "0.2" + +class InvalidBackend(Exception): + pass + +class QueueException(Exception): + pass + +# TODO: raise exceptions when stuff doesn't get stored/returned properly? +# i.e. unified API and handle what each backend returns. + +# Handle QUEUE_BACKEND set from either DJANGO_SETTINGS_MODULE or an environment variable. +# If set both places, django takes precedence. +try: + from django.conf import settings + BACKEND = settings.get('QUEUE_BACKEND', None) +except: + BACKEND = os.environ.get('QUEUE_BACKEND') + +if not BACKEND: + raise InvalidBackend("QUEUE_BACKEND not set.") + +# Set up queues.queues to point to the proper backend. +try: + # Most of the time we'll be importing a bundled backend, + # so look here first. You might recall this pattern from + # such web frameworks as Django. + queues = __import__('queues.backends.%s' % BACKEND, {}, {}, ['']) +except ImportError, e: + # If that didn't work, try an external import. + try: + queues = __import__(BACKEND, {}, {}, ['']) + except ImportError: + raise InvalidBackend("Unable to import QUEUE BACKEND '%s'" % BACKEND) \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/__init__.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,27 @@ +import os +from queues import InvalidBackend + +__all__ = ['backend'] + +# Handle QUEUE_BACKEND set from either DJANGO_SETTINGS_MODULE or an environment variable. +# If set both places, django takes precedence. +try: + from django.conf import settings + BACKEND = settings.get('QUEUE_BACKEND', None) +except: + BACKEND = os.environ.get('QUEUE_BACKEND', None) + +if not BACKEND: + raise InvalidBackend("QUEUE_BACKEND not set.") + +try: + # Most of the time we'll be importing a bundled backend, + # so look here first. You might recall this pattern from + # such web frameworks as Django. + backend = __import__('queues.backends.%s' % BACKEND, {}, {}, ['']) +except ImportError, e: + # If that didn't work, try an external import. + try: + backend = __import__(BACKEND, {}, {}, ['']) + except ImportError: + raise InvalidBackend("Unable to import QUEUE BACKEND '%s'" % BACKEND)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/base.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,27 @@ +"Base queue class" + +# Things to think about: +# - timeout/visibility timeout (boto) + +class BaseQueue(object): + """ + Abstract base class for queue backends. + """ + + def read(self): + raise NotImplementedError + + def write(self, message): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + +def create_queue(): + raise NotImplementedError + +def delete_queue(name): + raise NotImplementedError + +def get_list(): + raise NotImplementedError \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/memcached.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,82 @@ +""" +Backend for queues that implement the memcache protocol, including starling. + +This backend requires either the memcache or cmemcache libraries to be installed. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os, re + +try: + from cmemcache import Client + +except ImportError: + try: + from memcache import Client + except: + raise InvalidBackend("Unable to import a memcache library.") + +try: + from django.conf import settings + CONN = settings.get('QUEUE_MEMCACHE_CONNECTION', None) +except: + CONN = os.environ.get('QUEUE_MEMCACHE_CONNECTION', None) + +if not CONN: + raise InvalidBackend("QUEUE_MEMCACHE_CONNECTION not set.") + +class Queue(BaseQueue): + + def __init__(self, name): + self._connection = Client(CONN.split(';')) + self.backend = 'memcached' + self.name = name + + def read(self): + try: + return self._connection.get(self.name) + except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e: + raise QueueException, e + + def write(self, message): + try: + return self._connection.set(self.name, message, 0) + except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e: + raise QueueException, e + + def __len__(self): + try: + try: + return int(self._connection.get_stats()[0][1]['queue_%s_items' % self.name]) + except (memcache.MemcachedKeyError, MemcachedStringEncodingError), e: + raise QueueException, e + except AttributeError: + # If this memcached backend doesn't support starling-style stats + # or if this queue doesn't exist + return 0 + + def __repr__(self): + return "<Queue %s>" % self.name + +def create_queue(): + """This isn't required, so we noop. Kept here for swapability.""" + return True + +def delete_queue(name): + """Memcached backends don't provide a way to do this.""" + raise NotImplementedError + +def get_list(): + """Supports starling/peafowl-style queue_<name>_items introspection via stats.""" + conn = Client(CONN.split(';')) + queue_list = [] + queue_re = re.compile(r'queue\_(.*?)\_total_items') + try: + for server in conn.get_stats(): + for key in server[1].keys(): + if queue_re.findall(key): + queue_list.append(queue_re.findall(key)[0]) + except (KeyError, AttributeError, memcache.MemcachedKeyError, MemcachedStringEncodingError): + pass + return queue_list
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/queues/backends/sqs.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,99 @@ +""" +Backend for Amazon's Simple Queue Service. + +This backend requires that the boto library is installed. +""" + +from queues.backends.base import BaseQueue +from queues import InvalidBackend, QueueException +import os + +try: + from boto.sqs.connection import SQSConnection + from boto.sqs.message import Message + from boto.exception import SQSError +except ImportError: + raise InvalidBackend("Unable to import boto.") + +try: + from django.conf import settings + KEY = settings.get('AWS_ACCESS_KEY_ID', None) + SECRET = settings.get('AWS_SECRET_ACCESS_KEY', None) +except: + KEY = os.environ.get('AWS_ACCESS_KEY_ID', None) + SECRET = os.environ.get('AWS_SECRET_ACCESS_KEY', None) + +if not KEY: + raise InvalidBackend("AWS_ACCESS_KEY_ID not set.") +if not SECRET: + raise InvalidBackend("AWS_SECRET_ACCESS_KEY not set.") + +# ... and one connection to bind them. +connection = SQSConnection() + +class Queue(BaseQueue): + def __init__(self, name): + self.name = name + self.backend = 'sqs' + self._connection = connection + self._queue = self._connection.get_queue(self.name) + if not self._queue: + self._queue = self._connection.create_queue(name) + + def read(self): + try: + m = self._queue.read() + if not m: + return None + else: + self._queue.delete() + return m.get_body() + except SQSError, e: + raise QueueException, "%s" % e.code + + def write(self, message): + try: + m = Message() + m.set_body(message) + return self._queue.write(m) + except SQSError, e: + raise QueueException, "%s" % e.code + + def __len__(self): + try: + length = self._queue.count() + if not length: + length = 0 + return int(length) + except SQSError, e: + raise QueueException, "%s" % e.code + + def __repr__(self): + return "<Queue %s>" % self.name + +def create_queue(name): + """Create a queue for the given name.""" + try: + return connection.create_queue(name) + except SQSError, e: + raise QueueException, "%s" % e.code + +def delete_queue(name): + """ + Deletes a queue and any messages in it. + """ + # TODO: too fragile. + try: + return connection.get_status('DeleteQueue', None, '/' + name) + except SQSError, e: + raise QueueException, "%s" % e.code + +def get_list(): + """ + Get a list of names for all queues. Returns a list of ``queues.backends.sqs.Queue`` objects. + """ + # TODO: too fragile. + try: + return [q.id[1:] for q in connection.get_all_queues()] + except SQSError, e: + raise QueueException, "%s" % e.code
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/setup.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +from distutils.core import setup +import queues +setup(name='queues', + version='0.2', + description='A lowest-common-denominator API for interacting with lightweight queue services.', + author='Matt Croydon', + author_email='mcroydon@gmail.com', + url='http://postneo.com', # TODO: Fixme + packages=['queues', 'queues.backends'], + package_dir={'queues': 'queues'}, + )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,48 @@ +""" +Test basic queue functionality + +>>> from queues import queues +>>> import time +>>> queue_name = 'test_queues_%.f' % time.time() + +Verify that the queue does not exist +>>> queue_name in queues.get_list() +False + +Create the queue +>>> q = queues.Queue(queue_name) + +Write to the queue +>>> q.write('test') +True + +Verify that it is indeed in the list +>>> queue_name in queues.get_list() +True + +Get the length of the queue + +Note that SQS doesn't guarantee that the message +we just wrote will be immediately available +>>> len(q) +1 + +Read from the queue +>>> q.read() +'test' + +The queue should now be empty +Note that SQS doesn't guarantee an accurate count +>>> len(q) +0 + +>>> try: +... queues.delete_queue(queue_name) +... except NotImplementedError: +... print True +True +""" + +if __name__ == "__main__": + import doctest + doctest.testmod() \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/memcached.py Thu Jan 08 07:49:35 2009 +0000 @@ -0,0 +1,40 @@ +""" +Test basic queue functionality + +>>> from queues import queues +>>> import datetime +>>> queue_name = 'test_queues_%s' % datetime.datetime.now().isoformat() + +Verify that the queue does not exist +>>> queue_name in queues.get_list() +False + +Create the queue +>>> q = queues.Queue(queue_name) + +Write to the queue +>>> q.write('test') +True + +Verify that it is indeed in the list +>>> queue_name in queues.get_list() +True + +Get the length of the queue +>>> len(q) +1 + +Read from the queue +>>> q.read() +'test' + +The queue should now be empty +>>> len(q) +0 + +TODO: get rid of the queue? +""" + +if __name__ == "__main__": + import doctest + doctest.testmod() \ No newline at end of file