changeset 522:82b97697312e

Created Celery tasks to process new posts and topics. Keep the updated topic set in Redis. This is for tickets #194, #237, #239.
author Brian Neal <bgneal@gmail.com>
date Sun, 18 Dec 2011 23:46:52 +0000 (2011-12-18)
parents dd14ab08a9c4
children e9c446a64423
files gpp/core/functions.py gpp/forums/latest.py gpp/forums/management/commands/update_forum_stats.py gpp/forums/signals.py gpp/forums/stats.py gpp/forums/tasks.py gpp/forums/templatetags/forum_tags.py gpp/forums/views/main.py gpp/forums/views/subscriptions.py gpp/settings/base.py gpp/settings/test.py gpp/templates/forums/forum_stats_tag.html gpp/templates/forums/index.html gpp/templates/forums/new_posts_tag.html
diffstat 14 files changed, 332 insertions(+), 123 deletions(-) [+]
line wrap: on
line diff
--- a/gpp/core/functions.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/core/functions.py	Sun Dec 18 23:46:52 2011 +0000
@@ -5,17 +5,18 @@
 
 from django.contrib.sites.models import Site
 from django.conf import settings
+import django.core.mail
 
 import core.tasks
 
 
-def send_mail(subject, message, from_email, recipient_list, **kwargs):
+def send_mail(subject, message, from_email, recipient_list, defer=True, **kwargs):
     """
     The main send email function. Use this function to send email from the
     site. All applications should use this function instead of calling
     Django's directly.
-    If settings.GPP_SEND_EMAIL is true, the email will be sent to a Celery task
-    to actually send the email. Otherwise it is dropped. In any event, the
+    If defer is True, the email will be sent to a Celery task to actually send
+    the email. Otherwise it is sent on the caller's thread. In any event, the
     email will be logged at the DEBUG level.
 
     """
@@ -28,9 +29,12 @@
     logging.debug('EMAIL:\nFrom: %s\nTo: %s\nSubject: %s\nMessage:\n%s',
         from_email, str(recipient_list), subject, message)
 
-    if settings.GPP_SEND_EMAIL:
+    if defer:
         core.tasks.send_mail.delay(subject, message, from_email, recipient_list,
                 **kwargs)
+    else:
+        django.core.mail.send_mail(subject, message, from_email, recipient_list,
+                **kwargs)
 
 
 def email_admins(subject, message):
--- a/gpp/forums/latest.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/forums/latest.py	Sun Dec 18 23:46:52 2011 +0000
@@ -5,21 +5,38 @@
 ID in Redis. We also maintain a combined forums list. This allows quick
 retrieval of the latest posts and avoids some slow SQL queries.
 
+We also do things like send topic notification emails, auto-favorite, and
+auto-subscribe functions here rather than bog the user down in the request /
+response cycle.
+
 """
 import datetime
+import logging
 import time
 
 from django.dispatch import receiver
 from django.utils import simplejson
 
-from forums.signals import post_content_update
-from forums.models import Forum
+from forums.signals import post_content_update, topic_content_update
+from forums.models import Forum, Topic, Post
+from forums.views.subscriptions import notify_topic_subscribers
+from forums.tools import auto_favorite, auto_subscribe
 from core.services import get_redis_connection
 
-
 # This constant controls how many latest posts per forum we store
 MAX_POSTS = 50
 
+# This controls how many updated topics we track
+MAX_UPDATED_TOPICS = 50
+
+# Redis key names:
+POST_COUNT_KEY = "forums:public_post_count"
+TOPIC_COUNT_KEY = "forums:public_topic_count"
+UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set"
+UPDATED_TOPIC_KEY = "forums:updated_topics:%s"
+
+logger = logging.getLogger(__name__)
+
 
 @receiver(post_content_update, dispatch_uid='forums.latest_posts')
 def on_post_update(sender, **kwargs):
@@ -27,41 +44,69 @@
     This function is our signal handler, called when a post has been updated.
     We only care about newly created posts, and ignore updates.
 
-    We serialize a post to JSON then store in two lists in Redis:
-        1. The list for the post's parent forum
-        2. The combined forum list
-
-    Note that we only store posts from public forums.
+    We kick off a Celery task to perform work outside of the request/response
+    cycle.
 
     """
     # ignore non-new posts
     if not kwargs['created']:
         return
 
-    # ignore posts from non-public forums
-    public_forums = Forum.objects.public_forum_ids()
-    if sender.topic.forum.id not in public_forums:
+    # Kick off a Celery task to process this new post
+    forums.tasks.new_post_task.delay(sender.id)
+
+
+def process_new_post(post_id):
+    """
+    This function is run on a Celery task. It performs all new-post processing.
+
+    """
+    try:
+        post = Post.objects.select_related().get(pk=post_id)
+    except Post.DoesNotExist:
+        logger.warning("process_new_post: post %d does not exist", post_id)
         return
 
+    # selectively process posts from non-public forums
+    public_forums = Forum.objects.public_forum_ids()
+
+    if post.topic.forum.id in public_forums:
+        redis = get_redis_connection()
+        _update_post_feeds(redis, post)
+        _update_post_count(redis, public_forums)
+        _update_latest_topics(redis, post)
+
+    # send out any email notifications
+    notify_topic_subscribers(post, defer=False)
+
+    # perform any auto-favorite and auto-subscribe actions for the new post
+    auto_favorite(post)
+    auto_subscribe(post)
+
+
+def _update_post_feeds(redis, post):
+    """
+    Updates the forum feeds we keep in Redis so that our RSS feeds are quick.
+
+    """
     # serialize post attributes
     post_content = {
-        'id': sender.id,
-        'title': sender.topic.name,
-        'content': sender.html,
-        'author': sender.user.username,
-        'pubdate': int(time.mktime(sender.creation_date.timetuple())),
-        'forum_name': sender.topic.forum.name,
-        'url': sender.get_absolute_url()
+        'id': post.id,
+        'title': post.topic.name,
+        'content': post.html,
+        'author': post.user.username,
+        'pubdate': int(time.mktime(post.creation_date.timetuple())),
+        'forum_name': post.topic.forum.name,
+        'url': post.get_absolute_url()
     }
 
     s = simplejson.dumps(post_content)
 
     # store in Redis
 
-    redis = get_redis_connection()
     pipeline = redis.pipeline()
 
-    key = 'forums:latest:%d' % sender.topic.forum.id
+    key = 'forums:latest:%d' % post.topic.forum.id
 
     pipeline.lpush(key, s)
     pipeline.ltrim(key, 0, MAX_POSTS - 1)
@@ -77,6 +122,61 @@
     pipeline.execute()
 
 
+def _update_post_count(redis, public_forums):
+    """
+    Updates the post count we cache in Redis. Doing a COUNT(*) on the post table
+    can be expensive in MySQL InnoDB.
+
+    """
+    result = redis.incr(POST_COUNT_KEY)
+    if result == 1:
+        # it is likely redis got trashed, so re-compute the correct value
+
+        count = Post.objects.filter(topic__forum__in=public_forums).count()
+        redis.set(POST_COUNT_KEY, count)
+
+
+def _update_latest_topics(redis, post):
+    """
+    Updates the "latest topics with new posts" list we cache in Redis for speed.
+    There is a template tag and forum view that uses this information.
+
+    """
+    # serialize topic attributes
+    topic_id = post.topic.id
+    topic_score = int(time.mktime(post.creation_date.timetuple()))
+
+    topic_content = {
+        'title': post.topic.name,
+        'author': post.user.username,
+        'date': topic_score,
+        'url': post.get_absolute_url()
+    }
+    json = simplejson.dumps(topic_content)
+    key = UPDATED_TOPIC_KEY % topic_id
+
+    pipeline = redis.pipeline()
+    pipeline.set(key, json)
+    pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id)
+    pipeline.zcard(UPDATED_TOPICS_SET_KEY)
+    results = pipeline.execute()
+
+    # delete topics beyond our maximum count
+    num_topics = results[-1]
+    num_to_del = num_topics - MAX_UPDATED_TOPICS
+    if num_to_del > 0:
+        # get the IDs of the topics we need to delete first
+        start = 0
+        stop = num_to_del - 1   # Redis indices are inclusive
+        old_ids = redis.zrange(UPDATED_TOPICS_SET_KEY, start, stop)
+
+        keys = [UPDATED_TOPIC_KEY % n for n in old_ids]
+        redis.delete(*keys)
+
+        # now delete the oldest num_to_del topics
+        redis.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop)
+
+
 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None):
     """
     This function retrieves num_posts latest posts for the forum with the given
@@ -105,3 +205,137 @@
         posts.append(post)
 
     return posts
+
+
+@receiver(topic_content_update, dispatch_uid='forums.latest_posts')
+def on_topic_update(sender, **kwargs):
+    """
+    This function is our signal handler, called when a topic has been updated.
+    We only care about newly created topics, and ignore updates.
+
+    We kick off a Celery task to perform work outside of the request/response
+    cycle.
+
+    """
+    # ignore non-new topics
+    if not kwargs['created']:
+        return
+
+    # Kick off a Celery task to process this new post
+    forums.tasks.new_topic_task.delay(sender.id)
+
+
+def process_new_topic(topic_id):
+    """
+    This function contains new topic processing. Currently we only update the
+    topic count statistic.
+
+    """
+    try:
+        topic = Topic.objects.select_related().get(pk=topic_id)
+    except Topic.DoesNotExist:
+        logger.warning("process_new_topic: topic %d does not exist", topic_id)
+        return
+
+    # selectively process topics from non-public forums
+    public_forums = Forum.objects.public_forum_ids()
+
+    if topic.forum.id not in public_forums:
+        return
+
+    # update the topic count statistic
+    redis = get_redis_connection()
+
+    result = redis.incr(TOPIC_COUNT_KEY)
+    if result == 1:
+        # it is likely redis got trashed, so re-compute the correct value
+
+        count = Topic.objects.filter(forum__in=public_forums).count()
+        redis.set(TOPIC_COUNT_KEY, count)
+
+
+def get_stats():
+    """
+    This function returns the topic and post count statistics as a tuple, in
+    that order. If a statistic is not available, its position in the tuple will
+    be None.
+
+    """
+    try:
+        redis = get_redis_connection()
+        result = redis.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY)
+    except redis.RedisError, e:
+        logger.error(e)
+        return (None, None)
+
+    topic_count = int(result[0]) if result[0] else None
+    post_count = int(result[1]) if result[1] else None
+
+    return (topic_count, post_count)
+
+
+def get_latest_topic_ids(num):
+    """
+    Return a list of topic ids from the latest topics that have posts. The ids
+    will be sorted from newest to oldest.
+
+    """
+    try:
+        redis = get_redis_connection()
+        result = redis.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
+    except redis.RedisError, e:
+        logger.error(e)
+        return []
+
+    return [int(n) for n in result]
+
+
+def get_latest_topics(num):
+    """
+    Return a list of dictionaries with information about the latest topics that
+    have updated posts. The topics are sorted from newest to oldest.
+
+    """
+    try:
+        redis = get_redis_connection()
+        result = redis.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
+
+        topic_keys = [UPDATED_TOPIC_KEY % n for n in result]
+        json_list = redis.mget(topic_keys)
+
+    except redis.RedisError, e:
+        logger.error(e)
+        return []
+
+    topics = []
+    for s in json_list:
+        item = simplejson.loads(s)
+        item['date'] = datetime.datetime.fromtimestamp(item['date'])
+        topics.append(item)
+
+    return topics
+
+
+def notify_topic_delete(topic):
+    """
+    This function should be called when a topic is deleted. It will remove the
+    topic from the updated topics set, if present, and delete any info we have
+    about the topic.
+
+    Note we don't do anything like this for posts. Since they just populate RSS
+    feeds we'll let them 404. The updated topic list is seen in a prominent
+    template tag however, so it is a bit more important to get that cleaned up.
+
+    """
+    try:
+        redis = get_redis_connection()
+        pipeline = redis.pipeline()
+        pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id)
+        pipeline.delete(UPDATED_TOPIC_KEY % topic.id)
+        pipeline.execute()
+    except redis.RedisError, e:
+        logger.error(e)
+
+
+# Down here to avoid a circular import
+import forums.tasks
--- a/gpp/forums/management/commands/update_forum_stats.py	Sat Dec 17 23:43:00 2011 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,17 +0,0 @@
-"""
-update_forum_stats.py - A management command to calculate and update the
-cache with the forum statistics. These are done out of the request /
-response cycle because doing a count on the Post table is expensive
-under MySQL and InnoDb.
-
-"""
-from django.core.management.base import NoArgsCommand, CommandError
-
-from forums.stats import update_stats
-
-
-class Command(NoArgsCommand):
-    help = "Calculates and updates the cache with forums statistics"
-
-    def handle_noargs(self, **opts):
-        update_stats()
--- a/gpp/forums/signals.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/forums/signals.py	Sun Dec 18 23:46:52 2011 +0000
@@ -7,8 +7,6 @@
 import django.dispatch
 
 from forums.models import Forum, Topic, Post
-from forums.views.subscriptions import notify_topic_subscribers
-from forums.tools import auto_favorite, auto_subscribe
 
 
 def on_topic_save(sender, **kwargs):
@@ -22,6 +20,7 @@
     topic = kwargs['instance']
     topic.forum.topic_count_update()
     topic.forum.save()
+    forums.latest.notify_topic_delete(topic)
 
 
 def on_post_save(sender, **kwargs):
@@ -36,13 +35,6 @@
         post.topic.forum.post_count_update()
         post.topic.forum.save()
 
-        # send out any email notifications
-        notify_topic_subscribers(post)
-
-        # perform any auto-favorite and auto-subscribe actions for the new post
-        auto_favorite(post)
-        auto_subscribe(post)
-
 
 def on_post_delete(sender, **kwargs):
     post = kwargs['instance']
@@ -116,3 +108,7 @@
 
     """
     post_content_update.send_robust(post, created=False)
+
+
+# Avoid circular imports
+import forums.latest
--- a/gpp/forums/stats.py	Sat Dec 17 23:43:00 2011 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,50 +0,0 @@
-"""
-This module is responsible for managing various forum statistics.
-
-"""
-from django.core.cache import cache
-
-from forums.models import Post
-
-
-CACHE_KEY = 'forums-stats-2'
-CACHE_TIMEOUT = 4 * 60 * 60         # seconds
-
-
-def calc_stats():
-    """
-    This function is responsible for computing the forum statistics.
-    The forums post count is returned.
-
-    """
-    post_count = Post.objects.all().count()
-    return post_count
-
-
-def update_stats():
-    """
-    This function is responsible for computing the forum statistics and
-    inserting them into the cache. The stats are returned.
-
-    This function should be run periodically, preferably outside of the
-    request/response cycle. On MySQL under InnoDb it is expensive to retrieve
-    the total post count.
-
-    """
-    stats = calc_stats()
-    cache.set(CACHE_KEY, stats, CACHE_TIMEOUT)
-    return stats
-
-
-def retrieve_stats():
-    """
-    This function retrieves the forum stats from the cache if they are
-    available. If there is a cache-miss, the stats are calcuated, the cache is
-    updated, and the stats returned.
-
-    """
-    stats = cache.get(CACHE_KEY)
-    if stats is None:
-        stats = update_stats()
-
-    return stats
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/gpp/forums/tasks.py	Sun Dec 18 23:46:52 2011 +0000
@@ -0,0 +1,25 @@
+"""
+Celery tasks for the forums application.
+
+"""
+from celery.task import task
+
+import forums.latest
+
+
+@task
+def new_post_task(post_id):
+    """
+    This task performs new post processing on a Celery task.
+
+    """
+    forums.latest.process_new_post(post_id)
+
+
+@task
+def new_topic_task(topic_id):
+    """
+    This task performs new topic processing on a Celery task.
+
+    """
+    forums.latest.process_new_topic(topic_id)
--- a/gpp/forums/templatetags/forum_tags.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/forums/templatetags/forum_tags.py	Sun Dec 18 23:46:52 2011 +0000
@@ -13,7 +13,7 @@
 from forums.models import Topic
 from forums.models import Post
 from forums.models import Category
-from forums.stats import retrieve_stats
+from forums.latest import get_stats, get_latest_topics
 
 
 register = template.Library()
@@ -132,19 +132,14 @@
     }
 
 
-@register.inclusion_tag('forums/new_posts_tag.html', takes_context=True)
-def new_posts(context):
+@register.inclusion_tag('forums/new_posts_tag.html')
+def new_posts():
     """
     This tag displays the topics that have the newest posts.
     Only the "public" forums are displayed.
     """
-    public_forum_ids = Forum.objects.public_forum_ids()
-    topics = Topic.objects.filter(forum__in=public_forum_ids).select_related(
-                'last_post', 'last_post__user').order_by('-update_date')[:10]
-
     return {
-        'topics': topics,
-        'user': context['user'],
+        'topics': get_latest_topics(20),
     }
 
 
@@ -153,9 +148,10 @@
     """
     Displays forum statistics.
     """
-    post_count = retrieve_stats()
+    topic_count, post_count = get_stats()
 
     return {
+        'topic_count': topic_count,
         'post_count': post_count,
     }
 
--- a/gpp/forums/views/main.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/forums/views/main.py	Sun Dec 18 23:46:52 2011 +0000
@@ -38,6 +38,7 @@
 import forums.permissions as perms
 from forums.signals import (notify_new_topic, notify_updated_topic,
         notify_new_post, notify_updated_post)
+from forums.latest import get_latest_topic_ids
 
 #######################################################################
 
@@ -879,17 +880,21 @@
     # sanity check num
     num = min(50, max(10, int(num)))
 
-    public_forum_ids = Forum.objects.public_forum_ids()
-
     # MySQL didn't do this query very well unfortunately...
+    #
+    #public_forum_ids = Forum.objects.public_forum_ids()
     #topics = Topic.objects.filter(forum__in=public_forum_ids).select_related(
     #            'forum', 'user', 'last_post', 'last_post__user').order_by(
     #            '-update_date')[:num]
-    topic_ids = list(Topic.objects.filter(forum__in=public_forum_ids).order_by(
-            '-update_date').values_list('id', flat=True)[:num])
+
+    # Save 1 query by using forums.latest to give us a list of the most recent
+    # topics; forums.latest doesn't save enough info to give us everything we
+    # need so we hit the database for the rest.
+
+    topic_ids = get_latest_topic_ids(num)
     topics = Topic.objects.filter(id__in=topic_ids).select_related(
                 'forum', 'user', 'last_post', 'last_post__user').order_by(
-                '-update_date')[:num]
+                '-update_date')
 
     paginator = create_topic_paginator(topics)
     page_num = get_page_num(request)
--- a/gpp/forums/views/subscriptions.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/forums/views/subscriptions.py	Sun Dec 18 23:46:52 2011 +0000
@@ -18,13 +18,14 @@
 from core.paginator import DiggPaginator
 
 
-def notify_topic_subscribers(post):
+def notify_topic_subscribers(post, defer=True):
     """
     The argument post is a newly created post. Send out an email
     notification to all subscribers of the post's parent Topic.
+
+    The defer flag is passed to core.functions.send_mail. If True, the mail is
+    sent on a Celery task. If False, the mail is sent on the caller's thread.
     """
-    #TODO: consider moving this function of the HTTP request/response cycle.
-
     topic = post.topic
     recipients = topic.subscribers.exclude(id=post.user.id).values_list(
             'email', flat=True)
@@ -43,7 +44,8 @@
                 'unsubscribe_url': unsubscribe_url,
                 })
         for recipient in recipients:
-            send_mail(subject, msg, settings.DEFAULT_FROM_EMAIL, [recipient])
+            send_mail(subject, msg, settings.DEFAULT_FROM_EMAIL, [recipient],
+                    defer=defer)
 
 
 @login_required
--- a/gpp/settings/base.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/settings/base.py	Sun Dec 18 23:46:52 2011 +0000
@@ -199,6 +199,13 @@
 
 
 #######################################################################
+# Redis integration & settings
+#######################################################################
+REDIS_HOST = 'localhost'
+REDIS_PORT = 6379
+REDIS_DB = 0
+
+#######################################################################
 # Celery integration & settings
 #######################################################################
 BROKER_URL = 'redis://localhost:6379/1'
@@ -237,7 +244,6 @@
 #######################################################################
 # GPP Specific Settings
 #######################################################################
-GPP_SEND_EMAIL = True
 GPP_NO_REPLY_EMAIL = 'no_reply'
 AVATAR_DIR = 'avatars'
 MAX_AVATAR_SIZE_BYTES = 2 * 1024 * 1024
--- a/gpp/settings/test.py	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/settings/test.py	Sun Dec 18 23:46:52 2011 +0000
@@ -4,6 +4,12 @@
 """
 from settings.base import *
 
+# Use a different database in Redis for tests
+REDIS_DB = 14
+QUEUE_REDIS_DB = 14
+BROKER_URL = 'redis://localhost:6379/15'
+CELERY_REDIS_DB = 15
+
 DATABASES = {
     'default': {
         'ENGINE': 'django.db.backends.sqlite3',
--- a/gpp/templates/forums/forum_stats_tag.html	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/templates/forums/forum_stats_tag.html	Sun Dec 18 23:46:52 2011 +0000
@@ -1,5 +1,7 @@
 {% load url from future %}
 {% load humanize %}
 <div id="forum-stats">
-Our users have posted a total of <strong>{{ post_count|intcomma }}</strong> posts.
+{% if topic_count and post_count %}
+Our users have posted a total of <strong>{{ post_count|intcomma }}</strong> posts in <strong>{{ topic_count|intcomma }}</strong> topics.
+{% endif %}
 </div>
--- a/gpp/templates/forums/index.html	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/templates/forums/index.html	Sun Dec 18 23:46:52 2011 +0000
@@ -51,7 +51,7 @@
 </form>
 <br />
 {% user_stats %}
-{% cache 900 forum-stats-block %}
+{% cache 300 forum-stats-block %}
    {% forum_stats %}
 {% endcache %}
 {% cache 900 max-users-block %}
--- a/gpp/templates/forums/new_posts_tag.html	Sat Dec 17 23:43:00 2011 +0000
+++ b/gpp/templates/forums/new_posts_tag.html	Sun Dec 18 23:46:52 2011 +0000
@@ -6,7 +6,7 @@
 {% if topics %}
    <ul>
    {% for topic in topics %}
-   <li><a href="{{ topic.last_post.get_absolute_url }}">{{ topic.name }}</a> by {{ topic.last_post.user.username }} {{ topic.update_date|elapsed }}</li>
+      <li><a href="{{ topic.url }}">{{ topic.title }}</a> by {{ topic.author }} {{ topic.date|elapsed }}</li>
    {% endfor %}
    </ul>
 {% else %}