bgneal@509: """ bgneal@509: This module maintains the latest posts datastore. The latest posts are often bgneal@509: needed by RSS feeds, "latest posts" template tags, etc. This module listens for bgneal@509: the post_content_update signal, then bundles the post up and stores it by forum bgneal@509: ID in Redis. We also maintain a combined forums list. This allows quick bgneal@509: retrieval of the latest posts and avoids some slow SQL queries. bgneal@509: bgneal@522: We also do things like send topic notification emails, auto-favorite, and bgneal@522: auto-subscribe functions here rather than bog the user down in the request / bgneal@522: response cycle. bgneal@522: bgneal@509: """ bgneal@595: # Maintenance notes: bgneal@595: # How we use Redis in this module: bgneal@595: # bgneal@595: # Forum post processing: bgneal@595: # bgneal@595: # * Forum posts are turned into Python dictionaries, then converted to JSON and bgneal@595: # stored under keys: forums:post:id bgneal@595: # * Each forum has a list in Redis stored under the key: forums:rss:id. This bgneal@595: # is a list of post IDs. bgneal@595: # * There is also a key called forums:rss:* which is the combined latest bgneal@595: # feed. It is also a list of post IDs. bgneal@595: # * A sorted set is maintained that keeps track of the reference count for each bgneal@595: # post. When a new post is created, this reference count is 2 because it is bgneal@595: # stored in both the combined list and the parent forum list. bgneal@595: # This sorted set is stored under the key: forums:post_ref_cnt. bgneal@595: # * When a post falls off a list due to aging, the reference count in the bgneal@595: # ordered set is decremented. If it falls to zero, the post's key is deleted bgneal@595: # from Redis. bgneal@595: # * When a post is edited, and it is in Redis, we simply update the JSON bgneal@595: # content. bgneal@595: # * When a post is deleted, and it is in Redis, it is removed from the 2 lists, bgneal@595: # the ordered set, and deleted from Redis. bgneal@595: # * When the RSS feed wants to update, it simply pulls down the entire list of bgneal@595: # post IDs for the feed of interest, then does a get on all the posts. bgneal@595: # bgneal@595: # Topics with recent posts processing: bgneal@595: # bgneal@595: # * A key is created for each topic that is updated. bgneal@595: # * An ordered set of topics is maintained with the current time as the score. bgneal@595: # * An updated topic gets its score bumped. bgneal@595: # * We only allow MAX_UPDATED_TOPICS number of topics in the set. We sort the bgneal@595: # set by score, and the expired topics are removed from the set and their keys bgneal@595: # are deleted from Redis. bgneal@595: # * The template tag (or anyone) who wants the list of topics with new posts bgneal@595: # gets the list of IDs sorted by score from newest to oldest. An mget is then bgneal@595: # performed to get all the topic data and it is deserialized from JSON. bgneal@595: # bgneal@595: # We also maintain topic and post counts in Redis since select(*) can take a bgneal@595: # while with MySQL InnoDb. bgneal@595: # bgneal@509: import datetime bgneal@679: import json bgneal@522: import logging bgneal@509: import time bgneal@509: bgneal@509: from django.dispatch import receiver bgneal@594: from django.template.loader import render_to_string bgneal@523: import redis bgneal@509: bgneal@522: from forums.signals import post_content_update, topic_content_update bgneal@594: from forums.models import Forum, Topic, Post, Attachment bgneal@522: from forums.views.subscriptions import notify_topic_subscribers bgneal@522: from forums.tools import auto_favorite, auto_subscribe bgneal@509: from core.services import get_redis_connection bgneal@509: bgneal@509: # This constant controls how many latest posts per forum we store bgneal@509: MAX_POSTS = 50 bgneal@509: bgneal@522: # This controls how many updated topics we track bgneal@522: MAX_UPDATED_TOPICS = 50 bgneal@522: bgneal@522: # Redis key names: bgneal@522: POST_COUNT_KEY = "forums:public_post_count" bgneal@522: TOPIC_COUNT_KEY = "forums:public_topic_count" bgneal@522: UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set" bgneal@522: UPDATED_TOPIC_KEY = "forums:updated_topics:%s" bgneal@595: POST_KEY = "forums:post:%s" bgneal@595: FORUM_RSS_KEY = "forums:rss:%s" bgneal@595: ALL_FORUMS_RSS_KEY = "forums:rss:*" bgneal@595: POST_SET_KEY = "forums:post_ref_cnt" bgneal@522: bgneal@522: logger = logging.getLogger(__name__) bgneal@522: bgneal@509: bgneal@509: @receiver(post_content_update, dispatch_uid='forums.latest_posts') bgneal@509: def on_post_update(sender, **kwargs): bgneal@509: """ bgneal@595: This function is our signal handler, called when a post has been updated bgneal@595: or created. bgneal@509: bgneal@522: We kick off a Celery task to perform work outside of the request/response bgneal@522: cycle. bgneal@509: bgneal@509: """ bgneal@595: if kwargs['created']: bgneal@595: forums.tasks.new_post_task.delay(sender.id) bgneal@595: else: bgneal@595: forums.tasks.updated_post_task.delay(sender.id) bgneal@522: bgneal@522: bgneal@522: def process_new_post(post_id): bgneal@522: """ bgneal@522: This function is run on a Celery task. It performs all new-post processing. bgneal@522: bgneal@522: """ bgneal@522: try: bgneal@522: post = Post.objects.select_related().get(pk=post_id) bgneal@522: except Post.DoesNotExist: bgneal@522: logger.warning("process_new_post: post %d does not exist", post_id) bgneal@509: return bgneal@509: bgneal@522: # selectively process posts from non-public forums bgneal@522: public_forums = Forum.objects.public_forum_ids() bgneal@522: bgneal@522: if post.topic.forum.id in public_forums: bgneal@523: conn = get_redis_connection() bgneal@523: _update_post_feeds(conn, post) bgneal@523: _update_post_count(conn, public_forums) bgneal@523: _update_latest_topics(conn, post) bgneal@522: bgneal@522: # send out any email notifications bgneal@522: notify_topic_subscribers(post, defer=False) bgneal@522: bgneal@522: # perform any auto-favorite and auto-subscribe actions for the new post bgneal@522: auto_favorite(post) bgneal@522: auto_subscribe(post) bgneal@522: bgneal@522: bgneal@595: def process_updated_post(post_id): bgneal@595: """ bgneal@595: This function is run on a Celery task. It performs all updated-post bgneal@595: processing. bgneal@595: bgneal@595: """ bgneal@595: # Is this post ID in a RSS feed? bgneal@595: conn = get_redis_connection() bgneal@595: post_key = POST_KEY % post_id bgneal@595: post_val = conn.get(post_key) bgneal@595: bgneal@595: if post_val is not None: bgneal@595: # Update the post value in Redis bgneal@595: try: bgneal@595: post = Post.objects.select_related().get(pk=post_id) bgneal@595: except Post.DoesNotExist: bgneal@595: logger.warning("process_updated_post: post %d does not exist", post_id) bgneal@595: return bgneal@595: conn.set(post_key, _serialize_post(post)) bgneal@595: bgneal@595: bgneal@523: def _update_post_feeds(conn, post): bgneal@522: """ bgneal@522: Updates the forum feeds we keep in Redis so that our RSS feeds are quick. bgneal@522: bgneal@522: """ bgneal@595: post_key = POST_KEY % post.id bgneal@595: post_value = _serialize_post(post) bgneal@509: bgneal@523: pipeline = conn.pipeline() bgneal@509: bgneal@595: # Store serialized post content under its own key bgneal@595: pipeline.set(post_key, post_value) bgneal@509: bgneal@595: # Store in the RSS feed for the post's forum bgneal@595: forum_key = FORUM_RSS_KEY % post.topic.forum.id bgneal@595: pipeline.lpush(forum_key, post.id) bgneal@509: bgneal@595: # Store in the RSS feed for combined forums bgneal@595: pipeline.lpush(ALL_FORUMS_RSS_KEY, post.id) bgneal@509: bgneal@595: # Store reference count for the post bgneal@595: pipeline.zadd(POST_SET_KEY, 2, post.id) bgneal@509: bgneal@595: results = pipeline.execute() bgneal@509: bgneal@595: # Make sure our forums RSS lists lengths are not exceeded bgneal@595: bgneal@595: if results[1] > MAX_POSTS or results[2] > MAX_POSTS: bgneal@595: pipeline = conn.pipeline() bgneal@595: bgneal@595: # Truncate lists of posts: bgneal@595: if results[1] > MAX_POSTS: bgneal@595: pipeline.rpop(forum_key) bgneal@595: if results[2] > MAX_POSTS: bgneal@595: pipeline.rpop(ALL_FORUMS_RSS_KEY) bgneal@595: post_ids = pipeline.execute() bgneal@595: bgneal@595: # Decrement reference count(s) bgneal@595: pipeline = conn.pipeline() bgneal@595: for post_id in post_ids: bgneal@595: pipeline.zincrby(POST_SET_KEY, post_id, -1) bgneal@595: scores = pipeline.execute() bgneal@595: bgneal@595: # If any reference counts have fallen to 0, clean up: bgneal@595: if not all(scores): bgneal@595: pipeline = conn.pipeline() bgneal@595: bgneal@595: # remove from post set bgneal@595: ids = [post_ids[n] for n, s in enumerate(scores) if s <= 0.0] bgneal@595: pipeline.zrem(POST_SET_KEY, *ids) bgneal@595: bgneal@595: # remove serialized post data bgneal@595: keys = [POST_KEY % n for n in ids] bgneal@595: pipeline.delete(*keys) bgneal@595: bgneal@595: pipeline.execute() bgneal@509: bgneal@509: bgneal@523: def _update_post_count(conn, public_forums): bgneal@522: """ bgneal@522: Updates the post count we cache in Redis. Doing a COUNT(*) on the post table bgneal@522: can be expensive in MySQL InnoDB. bgneal@522: bgneal@522: """ bgneal@523: result = conn.incr(POST_COUNT_KEY) bgneal@522: if result == 1: bgneal@522: # it is likely redis got trashed, so re-compute the correct value bgneal@522: bgneal@522: count = Post.objects.filter(topic__forum__in=public_forums).count() bgneal@523: conn.set(POST_COUNT_KEY, count) bgneal@522: bgneal@522: bgneal@523: def _update_latest_topics(conn, post): bgneal@522: """ bgneal@522: Updates the "latest topics with new posts" list we cache in Redis for speed. bgneal@522: There is a template tag and forum view that uses this information. bgneal@522: bgneal@522: """ bgneal@522: # serialize topic attributes bgneal@522: topic_id = post.topic.id bgneal@522: topic_score = int(time.mktime(post.creation_date.timetuple())) bgneal@522: bgneal@522: topic_content = { bgneal@522: 'title': post.topic.name, bgneal@522: 'author': post.user.username, bgneal@522: 'date': topic_score, bgneal@529: 'url': post.topic.get_latest_post_url() bgneal@522: } bgneal@679: topic_json = json.dumps(topic_content) bgneal@522: key = UPDATED_TOPIC_KEY % topic_id bgneal@522: bgneal@523: pipeline = conn.pipeline() bgneal@679: pipeline.set(key, topic_json) bgneal@522: pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id) bgneal@522: pipeline.zcard(UPDATED_TOPICS_SET_KEY) bgneal@522: results = pipeline.execute() bgneal@522: bgneal@522: # delete topics beyond our maximum count bgneal@522: num_topics = results[-1] bgneal@522: num_to_del = num_topics - MAX_UPDATED_TOPICS bgneal@522: if num_to_del > 0: bgneal@522: # get the IDs of the topics we need to delete first bgneal@522: start = 0 bgneal@522: stop = num_to_del - 1 # Redis indices are inclusive bgneal@523: old_ids = conn.zrange(UPDATED_TOPICS_SET_KEY, start, stop) bgneal@522: bgneal@522: keys = [UPDATED_TOPIC_KEY % n for n in old_ids] bgneal@523: conn.delete(*keys) bgneal@522: bgneal@522: # now delete the oldest num_to_del topics bgneal@523: conn.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop) bgneal@522: bgneal@522: bgneal@509: def get_latest_posts(num_posts=MAX_POSTS, forum_id=None): bgneal@509: """ bgneal@509: This function retrieves num_posts latest posts for the forum with the given bgneal@509: forum_id. If forum_id is None, the posts are retrieved from the combined bgneal@509: forums datastore. A list of dictionaries is returned. Each dictionary bgneal@509: contains information about a post. bgneal@509: bgneal@509: """ bgneal@595: key = FORUM_RSS_KEY % forum_id if forum_id else ALL_FORUMS_RSS_KEY bgneal@509: bgneal@509: num_posts = max(0, min(MAX_POSTS, num_posts)) bgneal@509: bgneal@509: if num_posts == 0: bgneal@509: return [] bgneal@509: bgneal@523: conn = get_redis_connection() bgneal@595: post_ids = conn.lrange(key, 0, num_posts - 1) bgneal@595: if not post_ids: bgneal@595: return [] bgneal@595: bgneal@595: post_keys = [POST_KEY % n for n in post_ids] bgneal@595: raw_posts = conn.mget(post_keys) bgneal@595: raw_posts = [s for s in raw_posts if s is not None] bgneal@509: bgneal@509: posts = [] bgneal@509: for raw_post in raw_posts: bgneal@679: post = json.loads(raw_post) bgneal@509: bgneal@509: # fix up the pubdate; turn it back into a datetime object bgneal@509: post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate']) bgneal@509: bgneal@509: posts.append(post) bgneal@509: bgneal@509: return posts bgneal@522: bgneal@522: bgneal@522: @receiver(topic_content_update, dispatch_uid='forums.latest_posts') bgneal@522: def on_topic_update(sender, **kwargs): bgneal@522: """ bgneal@595: This function is our signal handler, called when a topic has been updated bgneal@595: or created. bgneal@522: bgneal@522: We kick off a Celery task to perform work outside of the request/response bgneal@522: cycle. bgneal@522: bgneal@522: """ bgneal@595: if kwargs['created']: bgneal@595: forums.tasks.new_topic_task.delay(sender.id) bgneal@595: else: bgneal@595: forums.tasks.updated_topic_task.delay(sender.id) bgneal@522: bgneal@522: bgneal@522: def process_new_topic(topic_id): bgneal@522: """ bgneal@522: This function contains new topic processing. Currently we only update the bgneal@522: topic count statistic. bgneal@522: bgneal@522: """ bgneal@522: try: bgneal@522: topic = Topic.objects.select_related().get(pk=topic_id) bgneal@522: except Topic.DoesNotExist: bgneal@522: logger.warning("process_new_topic: topic %d does not exist", topic_id) bgneal@522: return bgneal@522: bgneal@522: # selectively process topics from non-public forums bgneal@522: public_forums = Forum.objects.public_forum_ids() bgneal@522: bgneal@522: if topic.forum.id not in public_forums: bgneal@522: return bgneal@522: bgneal@522: # update the topic count statistic bgneal@523: conn = get_redis_connection() bgneal@522: bgneal@523: result = conn.incr(TOPIC_COUNT_KEY) bgneal@522: if result == 1: bgneal@522: # it is likely redis got trashed, so re-compute the correct value bgneal@522: bgneal@522: count = Topic.objects.filter(forum__in=public_forums).count() bgneal@523: conn.set(TOPIC_COUNT_KEY, count) bgneal@522: bgneal@522: bgneal@595: def process_updated_topic(topic_id): bgneal@595: """ bgneal@595: This function contains updated topic processing. Update the title only. bgneal@595: bgneal@595: """ bgneal@595: conn = get_redis_connection() bgneal@595: key = UPDATED_TOPIC_KEY % topic_id bgneal@679: topic_json = conn.get(key) bgneal@679: if topic_json is not None: bgneal@595: try: bgneal@595: topic = Topic.objects.get(pk=topic_id) bgneal@595: except Topic.DoesNotExist: bgneal@595: logger.warning("topic %d does not exist", topic_id) bgneal@595: return bgneal@595: bgneal@679: topic_dict = json.loads(topic_json) bgneal@595: bgneal@595: if topic.name != topic_dict['title']: bgneal@595: topic_dict['title'] = topic.name bgneal@679: topic_json = json.dumps(topic_dict) bgneal@679: conn.set(key, topic_json) bgneal@595: bgneal@595: bgneal@522: def get_stats(): bgneal@522: """ bgneal@522: This function returns the topic and post count statistics as a tuple, in bgneal@522: that order. If a statistic is not available, its position in the tuple will bgneal@522: be None. bgneal@522: bgneal@522: """ bgneal@522: try: bgneal@523: conn = get_redis_connection() bgneal@523: result = conn.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY) bgneal@522: except redis.RedisError, e: bgneal@522: logger.error(e) bgneal@522: return (None, None) bgneal@522: bgneal@522: topic_count = int(result[0]) if result[0] else None bgneal@522: post_count = int(result[1]) if result[1] else None bgneal@522: bgneal@522: return (topic_count, post_count) bgneal@522: bgneal@522: bgneal@522: def get_latest_topic_ids(num): bgneal@522: """ bgneal@522: Return a list of topic ids from the latest topics that have posts. The ids bgneal@522: will be sorted from newest to oldest. bgneal@522: bgneal@522: """ bgneal@522: try: bgneal@523: conn = get_redis_connection() bgneal@523: result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1) bgneal@522: except redis.RedisError, e: bgneal@522: logger.error(e) bgneal@522: return [] bgneal@522: bgneal@522: return [int(n) for n in result] bgneal@522: bgneal@522: bgneal@522: def get_latest_topics(num): bgneal@522: """ bgneal@522: Return a list of dictionaries with information about the latest topics that bgneal@522: have updated posts. The topics are sorted from newest to oldest. bgneal@522: bgneal@522: """ bgneal@522: try: bgneal@523: conn = get_redis_connection() bgneal@523: result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1) bgneal@522: bgneal@522: topic_keys = [UPDATED_TOPIC_KEY % n for n in result] bgneal@524: json_list = conn.mget(topic_keys) if topic_keys else [] bgneal@522: bgneal@522: except redis.RedisError, e: bgneal@522: logger.error(e) bgneal@522: return [] bgneal@522: bgneal@522: topics = [] bgneal@522: for s in json_list: bgneal@679: item = json.loads(s) bgneal@522: item['date'] = datetime.datetime.fromtimestamp(item['date']) bgneal@522: topics.append(item) bgneal@522: bgneal@522: return topics bgneal@522: bgneal@522: bgneal@522: def notify_topic_delete(topic): bgneal@522: """ bgneal@522: This function should be called when a topic is deleted. It will remove the bgneal@522: topic from the updated topics set, if present, and delete any info we have bgneal@522: about the topic. bgneal@522: bgneal@522: Note we don't do anything like this for posts. Since they just populate RSS bgneal@522: feeds we'll let them 404. The updated topic list is seen in a prominent bgneal@522: template tag however, so it is a bit more important to get that cleaned up. bgneal@522: bgneal@522: """ bgneal@522: try: bgneal@523: conn = get_redis_connection() bgneal@523: pipeline = conn.pipeline() bgneal@522: pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id) bgneal@522: pipeline.delete(UPDATED_TOPIC_KEY % topic.id) bgneal@522: pipeline.execute() bgneal@522: except redis.RedisError, e: bgneal@522: logger.error(e) bgneal@522: bgneal@522: bgneal@595: def _serialize_post(post): bgneal@595: """Serialize a post to JSON and return it. bgneal@595: bgneal@595: """ bgneal@595: # get any attachments for the post bgneal@595: bgneal@595: attachments = Attachment.objects.filter(post=post).select_related( bgneal@595: 'embed').order_by('order') bgneal@595: embeds = [item.embed for item in attachments] bgneal@595: if len(embeds) == 0: bgneal@595: content = post.html bgneal@595: else: bgneal@595: content = render_to_string('forums/post_rss.html', { bgneal@595: 'post': post, bgneal@595: 'embeds': embeds, bgneal@595: }) bgneal@595: bgneal@595: # serialize post attributes bgneal@595: post_content = { bgneal@595: 'id': post.id, bgneal@595: 'title': post.topic.name, bgneal@595: 'content': content, bgneal@595: 'author': post.user.username, bgneal@595: 'pubdate': int(time.mktime(post.creation_date.timetuple())), bgneal@595: 'forum_name': post.topic.forum.name, bgneal@595: 'url': post.get_absolute_url() bgneal@595: } bgneal@595: bgneal@679: return json.dumps(post_content) bgneal@595: bgneal@595: bgneal@522: # Down here to avoid a circular import bgneal@522: import forums.tasks