changeset 595:f3fded5df64b

Forum topic & post updates now affect the RSS feed data in Redis. This is for bitbucket issue #10.
author Brian Neal <bgneal@gmail.com>
date Thu, 24 May 2012 15:49:15 -0500
parents 2469d5864249
children cc39f34ce12b
files forums/latest.py forums/management/commands/forum_cleanup.py forums/tasks.py
diffstat 3 files changed, 201 insertions(+), 55 deletions(-) [+]
line wrap: on
line diff
--- a/forums/latest.py	Tue May 22 19:53:39 2012 -0500
+++ b/forums/latest.py	Thu May 24 15:49:15 2012 -0500
@@ -10,6 +10,46 @@
 response cycle.
 
 """
+# Maintenance notes:
+# How we use Redis in this module:
+# 
+# Forum post processing:
+#
+# * Forum posts are turned into Python dictionaries, then converted to JSON and
+#   stored under keys: forums:post:id
+# * Each forum has a list in Redis stored under the key: forums:rss:id. This
+#   is a list of post IDs.
+# * There is also a key called forums:rss:* which is the combined latest
+#   feed. It is also a list of post IDs.
+# * A sorted set is maintained that keeps track of the reference count for each
+#   post. When a new post is created, this reference count is 2 because it is
+#   stored in both the combined list and the parent forum list.
+#   This sorted set is stored under the key: forums:post_ref_cnt.
+# * When a post falls off a list due to aging, the reference count in the
+#   ordered set is decremented. If it falls to zero, the post's key is deleted
+#   from Redis.
+# * When a post is edited, and it is in Redis, we simply update the JSON
+#   content.
+# * When a post is deleted, and it is in Redis, it is removed from the 2 lists, 
+#   the ordered set, and deleted from Redis.
+# * When the RSS feed wants to update, it simply pulls down the entire list of
+#   post IDs for the feed of interest, then does a get on all the posts.
+#
+# Topics with recent posts processing:
+#
+# * A key is created for each topic that is updated.
+# * An ordered set of topics is maintained with the current time as the score.
+# * An updated topic gets its score bumped.
+# * We only allow MAX_UPDATED_TOPICS number of topics in the set. We sort the
+#   set by score, and the expired topics are removed from the set and their keys
+#   are deleted from Redis.
+# * The template tag (or anyone) who wants the list of topics with new posts
+#   gets the list of IDs sorted by score from newest to oldest. An mget is then
+#   performed to get all the topic data and it is deserialized from JSON.
+#
+# We also maintain topic and post counts in Redis since select(*) can take a
+# while with MySQL InnoDb.
+#
 import datetime
 import logging
 import time
@@ -36,6 +76,10 @@
 TOPIC_COUNT_KEY = "forums:public_topic_count"
 UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set"
 UPDATED_TOPIC_KEY = "forums:updated_topics:%s"
+POST_KEY = "forums:post:%s"
+FORUM_RSS_KEY = "forums:rss:%s"
+ALL_FORUMS_RSS_KEY = "forums:rss:*"
+POST_SET_KEY = "forums:post_ref_cnt"
 
 logger = logging.getLogger(__name__)
 
@@ -43,19 +87,17 @@
 @receiver(post_content_update, dispatch_uid='forums.latest_posts')
 def on_post_update(sender, **kwargs):
     """
-    This function is our signal handler, called when a post has been updated.
-    We only care about newly created posts, and ignore updates.
+    This function is our signal handler, called when a post has been updated
+    or created.
 
     We kick off a Celery task to perform work outside of the request/response
     cycle.
 
     """
-    # ignore non-new posts
-    if not kwargs['created']:
-        return
-
-    # Kick off a Celery task to process this new post
-    forums.tasks.new_post_task.delay(sender.id)
+    if kwargs['created']:
+        forums.tasks.new_post_task.delay(sender.id)
+    else:
+        forums.tasks.updated_post_task.delay(sender.id)
 
 
 def process_new_post(post_id):
@@ -86,55 +128,83 @@
     auto_subscribe(post)
 
 
+def process_updated_post(post_id):
+    """
+    This function is run on a Celery task. It performs all updated-post
+    processing.
+
+    """
+    # Is this post ID in a RSS feed?
+    conn = get_redis_connection()
+    post_key = POST_KEY % post_id
+    post_val = conn.get(post_key)
+
+    if post_val is not None:
+        # Update the post value in Redis
+        try:
+            post = Post.objects.select_related().get(pk=post_id)
+        except Post.DoesNotExist:
+            logger.warning("process_updated_post: post %d does not exist", post_id)
+            return
+        conn.set(post_key, _serialize_post(post))
+
+
 def _update_post_feeds(conn, post):
     """
     Updates the forum feeds we keep in Redis so that our RSS feeds are quick.
 
     """
-    # get any attachments for the post
-
-    attachments = Attachment.objects.filter(post=post).select_related(
-            'embed').order_by('order')
-    embeds = [item.embed for item in attachments]
-    if len(embeds) == 0:
-        content = post.html
-    else:
-        content = render_to_string('forums/post_rss.html', {
-            'post': post,
-            'embeds': embeds,
-        })
-
-    # serialize post attributes
-    post_content = {
-        'id': post.id,
-        'title': post.topic.name,
-        'content': content,
-        'author': post.user.username,
-        'pubdate': int(time.mktime(post.creation_date.timetuple())),
-        'forum_name': post.topic.forum.name,
-        'url': post.get_absolute_url()
-    }
-
-    s = simplejson.dumps(post_content)
-
-    # store in Redis
+    post_key = POST_KEY % post.id
+    post_value = _serialize_post(post)
 
     pipeline = conn.pipeline()
 
-    key = 'forums:latest:%d' % post.topic.forum.id
+    # Store serialized post content under its own key
+    pipeline.set(post_key, post_value)
 
-    pipeline.lpush(key, s)
-    pipeline.ltrim(key, 0, MAX_POSTS - 1)
+    # Store in the RSS feed for the post's forum
+    forum_key = FORUM_RSS_KEY % post.topic.forum.id
+    pipeline.lpush(forum_key, post.id)
 
-    # store in the combined feed; yes this wastes some memory storing it twice,
-    # but it makes things much easier
+    # Store in the RSS feed for combined forums
+    pipeline.lpush(ALL_FORUMS_RSS_KEY, post.id)
 
-    key = 'forums:latest:*'
+    # Store reference count for the post
+    pipeline.zadd(POST_SET_KEY, 2, post.id)
 
-    pipeline.lpush(key, s)
-    pipeline.ltrim(key, 0, MAX_POSTS - 1)
+    results = pipeline.execute()
 
-    pipeline.execute()
+    # Make sure our forums RSS lists lengths are not exceeded
+
+    if results[1] > MAX_POSTS or results[2] > MAX_POSTS:
+        pipeline = conn.pipeline()
+
+        # Truncate lists of posts:
+        if results[1] > MAX_POSTS:
+            pipeline.rpop(forum_key)
+        if results[2] > MAX_POSTS:
+            pipeline.rpop(ALL_FORUMS_RSS_KEY)
+        post_ids = pipeline.execute()
+
+        # Decrement reference count(s)
+        pipeline = conn.pipeline()
+        for post_id in post_ids:
+            pipeline.zincrby(POST_SET_KEY, post_id, -1)
+        scores = pipeline.execute()
+
+        # If any reference counts have fallen to 0, clean up:
+        if not all(scores):
+            pipeline = conn.pipeline()
+
+            # remove from post set
+            ids = [post_ids[n] for n, s in enumerate(scores) if s <= 0.0]
+            pipeline.zrem(POST_SET_KEY, *ids)
+
+            # remove serialized post data
+            keys = [POST_KEY % n for n in ids]
+            pipeline.delete(*keys)
+
+            pipeline.execute()
 
 
 def _update_post_count(conn, public_forums):
@@ -200,7 +270,7 @@
     contains information about a post.
 
     """
-    key = 'forums:latest:%d' % forum_id if forum_id else 'forums:latest:*'
+    key = FORUM_RSS_KEY % forum_id if forum_id else ALL_FORUMS_RSS_KEY
 
     num_posts = max(0, min(MAX_POSTS, num_posts))
 
@@ -208,7 +278,13 @@
         return []
 
     conn = get_redis_connection()
-    raw_posts = conn.lrange(key, 0, num_posts - 1)
+    post_ids = conn.lrange(key, 0, num_posts - 1)
+    if not post_ids:
+        return []
+
+    post_keys = [POST_KEY % n for n in post_ids]
+    raw_posts = conn.mget(post_keys)
+    raw_posts = [s for s in raw_posts if s is not None]
 
     posts = []
     for raw_post in raw_posts:
@@ -225,19 +301,17 @@
 @receiver(topic_content_update, dispatch_uid='forums.latest_posts')
 def on_topic_update(sender, **kwargs):
     """
-    This function is our signal handler, called when a topic has been updated.
-    We only care about newly created topics, and ignore updates.
+    This function is our signal handler, called when a topic has been updated
+    or created.
 
     We kick off a Celery task to perform work outside of the request/response
     cycle.
 
     """
-    # ignore non-new topics
-    if not kwargs['created']:
-        return
-
-    # Kick off a Celery task to process this new post
-    forums.tasks.new_topic_task.delay(sender.id)
+    if kwargs['created']:
+        forums.tasks.new_topic_task.delay(sender.id)
+    else:
+        forums.tasks.updated_topic_task.delay(sender.id)
 
 
 def process_new_topic(topic_id):
@@ -269,6 +343,29 @@
         conn.set(TOPIC_COUNT_KEY, count)
 
 
+def process_updated_topic(topic_id):
+    """
+    This function contains updated topic processing. Update the title only.
+
+    """
+    conn = get_redis_connection()
+    key = UPDATED_TOPIC_KEY % topic_id
+    json = conn.get(key)
+    if json is not None:
+        try:
+            topic = Topic.objects.get(pk=topic_id)
+        except Topic.DoesNotExist:
+            logger.warning("topic %d does not exist", topic_id)
+            return
+
+        topic_dict = simplejson.loads(json)
+
+        if topic.name != topic_dict['title']:
+            topic_dict['title'] = topic.name
+            json = simplejson.dumps(topic_dict)
+            conn.set(key, json)
+
+
 def get_stats():
     """
     This function returns the topic and post count statistics as a tuple, in
@@ -352,5 +449,36 @@
         logger.error(e)
 
 
+def _serialize_post(post):
+    """Serialize a post to JSON and return it.
+
+    """
+    # get any attachments for the post
+
+    attachments = Attachment.objects.filter(post=post).select_related(
+            'embed').order_by('order')
+    embeds = [item.embed for item in attachments]
+    if len(embeds) == 0:
+        content = post.html
+    else:
+        content = render_to_string('forums/post_rss.html', {
+            'post': post,
+            'embeds': embeds,
+        })
+
+    # serialize post attributes
+    post_content = {
+        'id': post.id,
+        'title': post.topic.name,
+        'content': content,
+        'author': post.user.username,
+        'pubdate': int(time.mktime(post.creation_date.timetuple())),
+        'forum_name': post.topic.forum.name,
+        'url': post.get_absolute_url()
+    }
+
+    return simplejson.dumps(post_content)
+
+
 # Down here to avoid a circular import
 import forums.tasks
--- a/forums/management/commands/forum_cleanup.py	Tue May 22 19:53:39 2012 -0500
+++ b/forums/management/commands/forum_cleanup.py	Thu May 24 15:49:15 2012 -0500
@@ -5,7 +5,7 @@
 """
 import datetime
 
-from django.core.management.base import NoArgsCommand, CommandError
+from django.core.management.base import NoArgsCommand
 
 from forums.models import ForumLastVisit, TopicLastVisit
 import forums.unread
--- a/forums/tasks.py	Tue May 22 19:53:39 2012 -0500
+++ b/forums/tasks.py	Thu May 24 15:49:15 2012 -0500
@@ -17,9 +17,27 @@
 
 
 @task
+def updated_post_task(post_id):
+    """
+    This task performs updated post processing on a Celery task.
+
+    """
+    forums.latest.process_updated_post(post_id)
+
+
+@task
 def new_topic_task(topic_id):
     """
     This task performs new topic processing on a Celery task.
 
     """
     forums.latest.process_new_topic(topic_id)
+
+
+@task
+def updated_topic_task(topic_id):
+    """
+    This task performs updated topic processing on a Celery task.
+
+    """
+    forums.latest.process_updated_topic(topic_id)