# HG changeset patch # User Brian Neal # Date 1337892555 18000 # Node ID f3fded5df64ba5a90fb9f129d246e0884391d205 # Parent 2469d5864249392afc58261b84d6677aca896a05 Forum topic & post updates now affect the RSS feed data in Redis. This is for bitbucket issue #10. diff -r 2469d5864249 -r f3fded5df64b forums/latest.py --- a/forums/latest.py Tue May 22 19:53:39 2012 -0500 +++ b/forums/latest.py Thu May 24 15:49:15 2012 -0500 @@ -10,6 +10,46 @@ response cycle. """ +# Maintenance notes: +# How we use Redis in this module: +# +# Forum post processing: +# +# * Forum posts are turned into Python dictionaries, then converted to JSON and +# stored under keys: forums:post:id +# * Each forum has a list in Redis stored under the key: forums:rss:id. This +# is a list of post IDs. +# * There is also a key called forums:rss:* which is the combined latest +# feed. It is also a list of post IDs. +# * A sorted set is maintained that keeps track of the reference count for each +# post. When a new post is created, this reference count is 2 because it is +# stored in both the combined list and the parent forum list. +# This sorted set is stored under the key: forums:post_ref_cnt. +# * When a post falls off a list due to aging, the reference count in the +# ordered set is decremented. If it falls to zero, the post's key is deleted +# from Redis. +# * When a post is edited, and it is in Redis, we simply update the JSON +# content. +# * When a post is deleted, and it is in Redis, it is removed from the 2 lists, +# the ordered set, and deleted from Redis. +# * When the RSS feed wants to update, it simply pulls down the entire list of +# post IDs for the feed of interest, then does a get on all the posts. +# +# Topics with recent posts processing: +# +# * A key is created for each topic that is updated. +# * An ordered set of topics is maintained with the current time as the score. +# * An updated topic gets its score bumped. +# * We only allow MAX_UPDATED_TOPICS number of topics in the set. We sort the +# set by score, and the expired topics are removed from the set and their keys +# are deleted from Redis. +# * The template tag (or anyone) who wants the list of topics with new posts +# gets the list of IDs sorted by score from newest to oldest. An mget is then +# performed to get all the topic data and it is deserialized from JSON. +# +# We also maintain topic and post counts in Redis since select(*) can take a +# while with MySQL InnoDb. +# import datetime import logging import time @@ -36,6 +76,10 @@ TOPIC_COUNT_KEY = "forums:public_topic_count" UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set" UPDATED_TOPIC_KEY = "forums:updated_topics:%s" +POST_KEY = "forums:post:%s" +FORUM_RSS_KEY = "forums:rss:%s" +ALL_FORUMS_RSS_KEY = "forums:rss:*" +POST_SET_KEY = "forums:post_ref_cnt" logger = logging.getLogger(__name__) @@ -43,19 +87,17 @@ @receiver(post_content_update, dispatch_uid='forums.latest_posts') def on_post_update(sender, **kwargs): """ - This function is our signal handler, called when a post has been updated. - We only care about newly created posts, and ignore updates. + This function is our signal handler, called when a post has been updated + or created. We kick off a Celery task to perform work outside of the request/response cycle. """ - # ignore non-new posts - if not kwargs['created']: - return - - # Kick off a Celery task to process this new post - forums.tasks.new_post_task.delay(sender.id) + if kwargs['created']: + forums.tasks.new_post_task.delay(sender.id) + else: + forums.tasks.updated_post_task.delay(sender.id) def process_new_post(post_id): @@ -86,55 +128,83 @@ auto_subscribe(post) +def process_updated_post(post_id): + """ + This function is run on a Celery task. It performs all updated-post + processing. + + """ + # Is this post ID in a RSS feed? + conn = get_redis_connection() + post_key = POST_KEY % post_id + post_val = conn.get(post_key) + + if post_val is not None: + # Update the post value in Redis + try: + post = Post.objects.select_related().get(pk=post_id) + except Post.DoesNotExist: + logger.warning("process_updated_post: post %d does not exist", post_id) + return + conn.set(post_key, _serialize_post(post)) + + def _update_post_feeds(conn, post): """ Updates the forum feeds we keep in Redis so that our RSS feeds are quick. """ - # get any attachments for the post - - attachments = Attachment.objects.filter(post=post).select_related( - 'embed').order_by('order') - embeds = [item.embed for item in attachments] - if len(embeds) == 0: - content = post.html - else: - content = render_to_string('forums/post_rss.html', { - 'post': post, - 'embeds': embeds, - }) - - # serialize post attributes - post_content = { - 'id': post.id, - 'title': post.topic.name, - 'content': content, - 'author': post.user.username, - 'pubdate': int(time.mktime(post.creation_date.timetuple())), - 'forum_name': post.topic.forum.name, - 'url': post.get_absolute_url() - } - - s = simplejson.dumps(post_content) - - # store in Redis + post_key = POST_KEY % post.id + post_value = _serialize_post(post) pipeline = conn.pipeline() - key = 'forums:latest:%d' % post.topic.forum.id + # Store serialized post content under its own key + pipeline.set(post_key, post_value) - pipeline.lpush(key, s) - pipeline.ltrim(key, 0, MAX_POSTS - 1) + # Store in the RSS feed for the post's forum + forum_key = FORUM_RSS_KEY % post.topic.forum.id + pipeline.lpush(forum_key, post.id) - # store in the combined feed; yes this wastes some memory storing it twice, - # but it makes things much easier + # Store in the RSS feed for combined forums + pipeline.lpush(ALL_FORUMS_RSS_KEY, post.id) - key = 'forums:latest:*' + # Store reference count for the post + pipeline.zadd(POST_SET_KEY, 2, post.id) - pipeline.lpush(key, s) - pipeline.ltrim(key, 0, MAX_POSTS - 1) + results = pipeline.execute() - pipeline.execute() + # Make sure our forums RSS lists lengths are not exceeded + + if results[1] > MAX_POSTS or results[2] > MAX_POSTS: + pipeline = conn.pipeline() + + # Truncate lists of posts: + if results[1] > MAX_POSTS: + pipeline.rpop(forum_key) + if results[2] > MAX_POSTS: + pipeline.rpop(ALL_FORUMS_RSS_KEY) + post_ids = pipeline.execute() + + # Decrement reference count(s) + pipeline = conn.pipeline() + for post_id in post_ids: + pipeline.zincrby(POST_SET_KEY, post_id, -1) + scores = pipeline.execute() + + # If any reference counts have fallen to 0, clean up: + if not all(scores): + pipeline = conn.pipeline() + + # remove from post set + ids = [post_ids[n] for n, s in enumerate(scores) if s <= 0.0] + pipeline.zrem(POST_SET_KEY, *ids) + + # remove serialized post data + keys = [POST_KEY % n for n in ids] + pipeline.delete(*keys) + + pipeline.execute() def _update_post_count(conn, public_forums): @@ -200,7 +270,7 @@ contains information about a post. """ - key = 'forums:latest:%d' % forum_id if forum_id else 'forums:latest:*' + key = FORUM_RSS_KEY % forum_id if forum_id else ALL_FORUMS_RSS_KEY num_posts = max(0, min(MAX_POSTS, num_posts)) @@ -208,7 +278,13 @@ return [] conn = get_redis_connection() - raw_posts = conn.lrange(key, 0, num_posts - 1) + post_ids = conn.lrange(key, 0, num_posts - 1) + if not post_ids: + return [] + + post_keys = [POST_KEY % n for n in post_ids] + raw_posts = conn.mget(post_keys) + raw_posts = [s for s in raw_posts if s is not None] posts = [] for raw_post in raw_posts: @@ -225,19 +301,17 @@ @receiver(topic_content_update, dispatch_uid='forums.latest_posts') def on_topic_update(sender, **kwargs): """ - This function is our signal handler, called when a topic has been updated. - We only care about newly created topics, and ignore updates. + This function is our signal handler, called when a topic has been updated + or created. We kick off a Celery task to perform work outside of the request/response cycle. """ - # ignore non-new topics - if not kwargs['created']: - return - - # Kick off a Celery task to process this new post - forums.tasks.new_topic_task.delay(sender.id) + if kwargs['created']: + forums.tasks.new_topic_task.delay(sender.id) + else: + forums.tasks.updated_topic_task.delay(sender.id) def process_new_topic(topic_id): @@ -269,6 +343,29 @@ conn.set(TOPIC_COUNT_KEY, count) +def process_updated_topic(topic_id): + """ + This function contains updated topic processing. Update the title only. + + """ + conn = get_redis_connection() + key = UPDATED_TOPIC_KEY % topic_id + json = conn.get(key) + if json is not None: + try: + topic = Topic.objects.get(pk=topic_id) + except Topic.DoesNotExist: + logger.warning("topic %d does not exist", topic_id) + return + + topic_dict = simplejson.loads(json) + + if topic.name != topic_dict['title']: + topic_dict['title'] = topic.name + json = simplejson.dumps(topic_dict) + conn.set(key, json) + + def get_stats(): """ This function returns the topic and post count statistics as a tuple, in @@ -352,5 +449,36 @@ logger.error(e) +def _serialize_post(post): + """Serialize a post to JSON and return it. + + """ + # get any attachments for the post + + attachments = Attachment.objects.filter(post=post).select_related( + 'embed').order_by('order') + embeds = [item.embed for item in attachments] + if len(embeds) == 0: + content = post.html + else: + content = render_to_string('forums/post_rss.html', { + 'post': post, + 'embeds': embeds, + }) + + # serialize post attributes + post_content = { + 'id': post.id, + 'title': post.topic.name, + 'content': content, + 'author': post.user.username, + 'pubdate': int(time.mktime(post.creation_date.timetuple())), + 'forum_name': post.topic.forum.name, + 'url': post.get_absolute_url() + } + + return simplejson.dumps(post_content) + + # Down here to avoid a circular import import forums.tasks diff -r 2469d5864249 -r f3fded5df64b forums/management/commands/forum_cleanup.py --- a/forums/management/commands/forum_cleanup.py Tue May 22 19:53:39 2012 -0500 +++ b/forums/management/commands/forum_cleanup.py Thu May 24 15:49:15 2012 -0500 @@ -5,7 +5,7 @@ """ import datetime -from django.core.management.base import NoArgsCommand, CommandError +from django.core.management.base import NoArgsCommand from forums.models import ForumLastVisit, TopicLastVisit import forums.unread diff -r 2469d5864249 -r f3fded5df64b forums/tasks.py --- a/forums/tasks.py Tue May 22 19:53:39 2012 -0500 +++ b/forums/tasks.py Thu May 24 15:49:15 2012 -0500 @@ -17,9 +17,27 @@ @task +def updated_post_task(post_id): + """ + This task performs updated post processing on a Celery task. + + """ + forums.latest.process_updated_post(post_id) + + +@task def new_topic_task(topic_id): """ This task performs new topic processing on a Celery task. """ forums.latest.process_new_topic(topic_id) + + +@task +def updated_topic_task(topic_id): + """ + This task performs updated topic processing on a Celery task. + + """ + forums.latest.process_updated_topic(topic_id)