bgneal@509: """
bgneal@509: This module maintains the latest posts datastore. The latest posts are often
bgneal@509: needed by RSS feeds, "latest posts" template tags, etc. This module listens for
bgneal@509: the post_content_update signal, then bundles the post up and stores it by forum
bgneal@509: ID in Redis. We also maintain a combined forums list. This allows quick
bgneal@509: retrieval of the latest posts and avoids some slow SQL queries.
bgneal@509: 
bgneal@522: We also do things like send topic notification emails, auto-favorite, and
bgneal@522: auto-subscribe functions here rather than bog the user down in the request /
bgneal@522: response cycle.
bgneal@522: 
bgneal@509: """
bgneal@595: # Maintenance notes:
bgneal@595: # How we use Redis in this module:
bgneal@595: # 
bgneal@595: # Forum post processing:
bgneal@595: #
bgneal@595: # * Forum posts are turned into Python dictionaries, then converted to JSON and
bgneal@595: #   stored under keys: forums:post:id
bgneal@595: # * Each forum has a list in Redis stored under the key: forums:rss:id. This
bgneal@595: #   is a list of post IDs.
bgneal@595: # * There is also a key called forums:rss:* which is the combined latest
bgneal@595: #   feed. It is also a list of post IDs.
bgneal@595: # * A sorted set is maintained that keeps track of the reference count for each
bgneal@595: #   post. When a new post is created, this reference count is 2 because it is
bgneal@595: #   stored in both the combined list and the parent forum list.
bgneal@595: #   This sorted set is stored under the key: forums:post_ref_cnt.
bgneal@595: # * When a post falls off a list due to aging, the reference count in the
bgneal@595: #   ordered set is decremented. If it falls to zero, the post's key is deleted
bgneal@595: #   from Redis.
bgneal@595: # * When a post is edited, and it is in Redis, we simply update the JSON
bgneal@595: #   content.
bgneal@595: # * When a post is deleted, and it is in Redis, it is removed from the 2 lists, 
bgneal@595: #   the ordered set, and deleted from Redis.
bgneal@595: # * When the RSS feed wants to update, it simply pulls down the entire list of
bgneal@595: #   post IDs for the feed of interest, then does a get on all the posts.
bgneal@595: #
bgneal@595: # Topics with recent posts processing:
bgneal@595: #
bgneal@595: # * A key is created for each topic that is updated.
bgneal@595: # * An ordered set of topics is maintained with the current time as the score.
bgneal@595: # * An updated topic gets its score bumped.
bgneal@595: # * We only allow MAX_UPDATED_TOPICS number of topics in the set. We sort the
bgneal@595: #   set by score, and the expired topics are removed from the set and their keys
bgneal@595: #   are deleted from Redis.
bgneal@595: # * The template tag (or anyone) who wants the list of topics with new posts
bgneal@595: #   gets the list of IDs sorted by score from newest to oldest. An mget is then
bgneal@595: #   performed to get all the topic data and it is deserialized from JSON.
bgneal@595: #
bgneal@595: # We also maintain topic and post counts in Redis since select(*) can take a
bgneal@595: # while with MySQL InnoDb.
bgneal@595: #
bgneal@509: import datetime
bgneal@679: import json
bgneal@522: import logging
bgneal@509: import time
bgneal@509: 
bgneal@509: from django.dispatch import receiver
bgneal@594: from django.template.loader import render_to_string
bgneal@523: import redis
bgneal@509: 
bgneal@522: from forums.signals import post_content_update, topic_content_update
bgneal@594: from forums.models import Forum, Topic, Post, Attachment
bgneal@522: from forums.views.subscriptions import notify_topic_subscribers
bgneal@522: from forums.tools import auto_favorite, auto_subscribe
bgneal@509: from core.services import get_redis_connection
bgneal@509: 
bgneal@509: # This constant controls how many latest posts per forum we store
bgneal@509: MAX_POSTS = 50
bgneal@509: 
bgneal@522: # This controls how many updated topics we track
bgneal@522: MAX_UPDATED_TOPICS = 50
bgneal@522: 
bgneal@522: # Redis key names:
bgneal@522: POST_COUNT_KEY = "forums:public_post_count"
bgneal@522: TOPIC_COUNT_KEY = "forums:public_topic_count"
bgneal@522: UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set"
bgneal@522: UPDATED_TOPIC_KEY = "forums:updated_topics:%s"
bgneal@595: POST_KEY = "forums:post:%s"
bgneal@595: FORUM_RSS_KEY = "forums:rss:%s"
bgneal@595: ALL_FORUMS_RSS_KEY = "forums:rss:*"
bgneal@595: POST_SET_KEY = "forums:post_ref_cnt"
bgneal@522: 
bgneal@522: logger = logging.getLogger(__name__)
bgneal@522: 
bgneal@509: 
bgneal@509: @receiver(post_content_update, dispatch_uid='forums.latest_posts')
bgneal@509: def on_post_update(sender, **kwargs):
bgneal@509:     """
bgneal@595:     This function is our signal handler, called when a post has been updated
bgneal@595:     or created.
bgneal@509: 
bgneal@522:     We kick off a Celery task to perform work outside of the request/response
bgneal@522:     cycle.
bgneal@509: 
bgneal@509:     """
bgneal@595:     if kwargs['created']:
bgneal@595:         forums.tasks.new_post_task.delay(sender.id)
bgneal@595:     else:
bgneal@595:         forums.tasks.updated_post_task.delay(sender.id)
bgneal@522: 
bgneal@522: 
bgneal@522: def process_new_post(post_id):
bgneal@522:     """
bgneal@522:     This function is run on a Celery task. It performs all new-post processing.
bgneal@522: 
bgneal@522:     """
bgneal@522:     try:
bgneal@522:         post = Post.objects.select_related().get(pk=post_id)
bgneal@522:     except Post.DoesNotExist:
bgneal@522:         logger.warning("process_new_post: post %d does not exist", post_id)
bgneal@509:         return
bgneal@509: 
bgneal@522:     # selectively process posts from non-public forums
bgneal@522:     public_forums = Forum.objects.public_forum_ids()
bgneal@522: 
bgneal@522:     if post.topic.forum.id in public_forums:
bgneal@523:         conn = get_redis_connection()
bgneal@523:         _update_post_feeds(conn, post)
bgneal@523:         _update_post_count(conn, public_forums)
bgneal@523:         _update_latest_topics(conn, post)
bgneal@522: 
bgneal@522:     # send out any email notifications
bgneal@522:     notify_topic_subscribers(post, defer=False)
bgneal@522: 
bgneal@522:     # perform any auto-favorite and auto-subscribe actions for the new post
bgneal@522:     auto_favorite(post)
bgneal@522:     auto_subscribe(post)
bgneal@522: 
bgneal@522: 
bgneal@595: def process_updated_post(post_id):
bgneal@595:     """
bgneal@595:     This function is run on a Celery task. It performs all updated-post
bgneal@595:     processing.
bgneal@595: 
bgneal@595:     """
bgneal@595:     # Is this post ID in a RSS feed?
bgneal@595:     conn = get_redis_connection()
bgneal@595:     post_key = POST_KEY % post_id
bgneal@595:     post_val = conn.get(post_key)
bgneal@595: 
bgneal@595:     if post_val is not None:
bgneal@595:         # Update the post value in Redis
bgneal@595:         try:
bgneal@595:             post = Post.objects.select_related().get(pk=post_id)
bgneal@595:         except Post.DoesNotExist:
bgneal@595:             logger.warning("process_updated_post: post %d does not exist", post_id)
bgneal@595:             return
bgneal@595:         conn.set(post_key, _serialize_post(post))
bgneal@595: 
bgneal@595: 
bgneal@523: def _update_post_feeds(conn, post):
bgneal@522:     """
bgneal@522:     Updates the forum feeds we keep in Redis so that our RSS feeds are quick.
bgneal@522: 
bgneal@522:     """
bgneal@595:     post_key = POST_KEY % post.id
bgneal@595:     post_value = _serialize_post(post)
bgneal@509: 
bgneal@523:     pipeline = conn.pipeline()
bgneal@509: 
bgneal@595:     # Store serialized post content under its own key
bgneal@595:     pipeline.set(post_key, post_value)
bgneal@509: 
bgneal@595:     # Store in the RSS feed for the post's forum
bgneal@595:     forum_key = FORUM_RSS_KEY % post.topic.forum.id
bgneal@595:     pipeline.lpush(forum_key, post.id)
bgneal@509: 
bgneal@595:     # Store in the RSS feed for combined forums
bgneal@595:     pipeline.lpush(ALL_FORUMS_RSS_KEY, post.id)
bgneal@509: 
bgneal@595:     # Store reference count for the post
bgneal@595:     pipeline.zadd(POST_SET_KEY, 2, post.id)
bgneal@509: 
bgneal@595:     results = pipeline.execute()
bgneal@509: 
bgneal@595:     # Make sure our forums RSS lists lengths are not exceeded
bgneal@595: 
bgneal@595:     if results[1] > MAX_POSTS or results[2] > MAX_POSTS:
bgneal@595:         pipeline = conn.pipeline()
bgneal@595: 
bgneal@595:         # Truncate lists of posts:
bgneal@595:         if results[1] > MAX_POSTS:
bgneal@595:             pipeline.rpop(forum_key)
bgneal@595:         if results[2] > MAX_POSTS:
bgneal@595:             pipeline.rpop(ALL_FORUMS_RSS_KEY)
bgneal@595:         post_ids = pipeline.execute()
bgneal@595: 
bgneal@595:         # Decrement reference count(s)
bgneal@595:         pipeline = conn.pipeline()
bgneal@595:         for post_id in post_ids:
bgneal@595:             pipeline.zincrby(POST_SET_KEY, post_id, -1)
bgneal@595:         scores = pipeline.execute()
bgneal@595: 
bgneal@595:         # If any reference counts have fallen to 0, clean up:
bgneal@595:         if not all(scores):
bgneal@595:             pipeline = conn.pipeline()
bgneal@595: 
bgneal@595:             # remove from post set
bgneal@595:             ids = [post_ids[n] for n, s in enumerate(scores) if s <= 0.0]
bgneal@595:             pipeline.zrem(POST_SET_KEY, *ids)
bgneal@595: 
bgneal@595:             # remove serialized post data
bgneal@595:             keys = [POST_KEY % n for n in ids]
bgneal@595:             pipeline.delete(*keys)
bgneal@595: 
bgneal@595:             pipeline.execute()
bgneal@509: 
bgneal@509: 
bgneal@523: def _update_post_count(conn, public_forums):
bgneal@522:     """
bgneal@522:     Updates the post count we cache in Redis. Doing a COUNT(*) on the post table
bgneal@522:     can be expensive in MySQL InnoDB.
bgneal@522: 
bgneal@522:     """
bgneal@523:     result = conn.incr(POST_COUNT_KEY)
bgneal@522:     if result == 1:
bgneal@522:         # it is likely redis got trashed, so re-compute the correct value
bgneal@522: 
bgneal@522:         count = Post.objects.filter(topic__forum__in=public_forums).count()
bgneal@523:         conn.set(POST_COUNT_KEY, count)
bgneal@522: 
bgneal@522: 
bgneal@523: def _update_latest_topics(conn, post):
bgneal@522:     """
bgneal@522:     Updates the "latest topics with new posts" list we cache in Redis for speed.
bgneal@522:     There is a template tag and forum view that uses this information.
bgneal@522: 
bgneal@522:     """
bgneal@522:     # serialize topic attributes
bgneal@522:     topic_id = post.topic.id
bgneal@522:     topic_score = int(time.mktime(post.creation_date.timetuple()))
bgneal@522: 
bgneal@522:     topic_content = {
bgneal@522:         'title': post.topic.name,
bgneal@522:         'author': post.user.username,
bgneal@522:         'date': topic_score,
bgneal@529:         'url': post.topic.get_latest_post_url()
bgneal@522:     }
bgneal@679:     topic_json = json.dumps(topic_content)
bgneal@522:     key = UPDATED_TOPIC_KEY % topic_id
bgneal@522: 
bgneal@523:     pipeline = conn.pipeline()
bgneal@679:     pipeline.set(key, topic_json)
bgneal@522:     pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id)
bgneal@522:     pipeline.zcard(UPDATED_TOPICS_SET_KEY)
bgneal@522:     results = pipeline.execute()
bgneal@522: 
bgneal@522:     # delete topics beyond our maximum count
bgneal@522:     num_topics = results[-1]
bgneal@522:     num_to_del = num_topics - MAX_UPDATED_TOPICS
bgneal@522:     if num_to_del > 0:
bgneal@522:         # get the IDs of the topics we need to delete first
bgneal@522:         start = 0
bgneal@522:         stop = num_to_del - 1   # Redis indices are inclusive
bgneal@523:         old_ids = conn.zrange(UPDATED_TOPICS_SET_KEY, start, stop)
bgneal@522: 
bgneal@522:         keys = [UPDATED_TOPIC_KEY % n for n in old_ids]
bgneal@523:         conn.delete(*keys)
bgneal@522: 
bgneal@522:         # now delete the oldest num_to_del topics
bgneal@523:         conn.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop)
bgneal@522: 
bgneal@522: 
bgneal@509: def get_latest_posts(num_posts=MAX_POSTS, forum_id=None):
bgneal@509:     """
bgneal@509:     This function retrieves num_posts latest posts for the forum with the given
bgneal@509:     forum_id. If forum_id is None, the posts are retrieved from the combined
bgneal@509:     forums datastore. A list of dictionaries is returned. Each dictionary
bgneal@509:     contains information about a post.
bgneal@509: 
bgneal@509:     """
bgneal@595:     key = FORUM_RSS_KEY % forum_id if forum_id else ALL_FORUMS_RSS_KEY
bgneal@509: 
bgneal@509:     num_posts = max(0, min(MAX_POSTS, num_posts))
bgneal@509: 
bgneal@509:     if num_posts == 0:
bgneal@509:         return []
bgneal@509: 
bgneal@523:     conn = get_redis_connection()
bgneal@595:     post_ids = conn.lrange(key, 0, num_posts - 1)
bgneal@595:     if not post_ids:
bgneal@595:         return []
bgneal@595: 
bgneal@595:     post_keys = [POST_KEY % n for n in post_ids]
bgneal@595:     raw_posts = conn.mget(post_keys)
bgneal@595:     raw_posts = [s for s in raw_posts if s is not None]
bgneal@509: 
bgneal@509:     posts = []
bgneal@509:     for raw_post in raw_posts:
bgneal@679:         post = json.loads(raw_post)
bgneal@509: 
bgneal@509:         # fix up the pubdate; turn it back into a datetime object
bgneal@509:         post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate'])
bgneal@509: 
bgneal@509:         posts.append(post)
bgneal@509: 
bgneal@509:     return posts
bgneal@522: 
bgneal@522: 
bgneal@522: @receiver(topic_content_update, dispatch_uid='forums.latest_posts')
bgneal@522: def on_topic_update(sender, **kwargs):
bgneal@522:     """
bgneal@595:     This function is our signal handler, called when a topic has been updated
bgneal@595:     or created.
bgneal@522: 
bgneal@522:     We kick off a Celery task to perform work outside of the request/response
bgneal@522:     cycle.
bgneal@522: 
bgneal@522:     """
bgneal@595:     if kwargs['created']:
bgneal@595:         forums.tasks.new_topic_task.delay(sender.id)
bgneal@595:     else:
bgneal@595:         forums.tasks.updated_topic_task.delay(sender.id)
bgneal@522: 
bgneal@522: 
bgneal@522: def process_new_topic(topic_id):
bgneal@522:     """
bgneal@522:     This function contains new topic processing. Currently we only update the
bgneal@522:     topic count statistic.
bgneal@522: 
bgneal@522:     """
bgneal@522:     try:
bgneal@522:         topic = Topic.objects.select_related().get(pk=topic_id)
bgneal@522:     except Topic.DoesNotExist:
bgneal@522:         logger.warning("process_new_topic: topic %d does not exist", topic_id)
bgneal@522:         return
bgneal@522: 
bgneal@522:     # selectively process topics from non-public forums
bgneal@522:     public_forums = Forum.objects.public_forum_ids()
bgneal@522: 
bgneal@522:     if topic.forum.id not in public_forums:
bgneal@522:         return
bgneal@522: 
bgneal@522:     # update the topic count statistic
bgneal@523:     conn = get_redis_connection()
bgneal@522: 
bgneal@523:     result = conn.incr(TOPIC_COUNT_KEY)
bgneal@522:     if result == 1:
bgneal@522:         # it is likely redis got trashed, so re-compute the correct value
bgneal@522: 
bgneal@522:         count = Topic.objects.filter(forum__in=public_forums).count()
bgneal@523:         conn.set(TOPIC_COUNT_KEY, count)
bgneal@522: 
bgneal@522: 
bgneal@595: def process_updated_topic(topic_id):
bgneal@595:     """
bgneal@595:     This function contains updated topic processing. Update the title only.
bgneal@595: 
bgneal@595:     """
bgneal@595:     conn = get_redis_connection()
bgneal@595:     key = UPDATED_TOPIC_KEY % topic_id
bgneal@679:     topic_json = conn.get(key)
bgneal@679:     if topic_json is not None:
bgneal@595:         try:
bgneal@595:             topic = Topic.objects.get(pk=topic_id)
bgneal@595:         except Topic.DoesNotExist:
bgneal@595:             logger.warning("topic %d does not exist", topic_id)
bgneal@595:             return
bgneal@595: 
bgneal@679:         topic_dict = json.loads(topic_json)
bgneal@595: 
bgneal@595:         if topic.name != topic_dict['title']:
bgneal@595:             topic_dict['title'] = topic.name
bgneal@679:             topic_json = json.dumps(topic_dict)
bgneal@679:             conn.set(key, topic_json)
bgneal@595: 
bgneal@595: 
bgneal@522: def get_stats():
bgneal@522:     """
bgneal@522:     This function returns the topic and post count statistics as a tuple, in
bgneal@522:     that order. If a statistic is not available, its position in the tuple will
bgneal@522:     be None.
bgneal@522: 
bgneal@522:     """
bgneal@522:     try:
bgneal@523:         conn = get_redis_connection()
bgneal@523:         result = conn.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY)
bgneal@522:     except redis.RedisError, e:
bgneal@522:         logger.error(e)
bgneal@522:         return (None, None)
bgneal@522: 
bgneal@522:     topic_count = int(result[0]) if result[0] else None
bgneal@522:     post_count = int(result[1]) if result[1] else None
bgneal@522: 
bgneal@522:     return (topic_count, post_count)
bgneal@522: 
bgneal@522: 
bgneal@522: def get_latest_topic_ids(num):
bgneal@522:     """
bgneal@522:     Return a list of topic ids from the latest topics that have posts. The ids
bgneal@522:     will be sorted from newest to oldest.
bgneal@522: 
bgneal@522:     """
bgneal@522:     try:
bgneal@523:         conn = get_redis_connection()
bgneal@523:         result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
bgneal@522:     except redis.RedisError, e:
bgneal@522:         logger.error(e)
bgneal@522:         return []
bgneal@522: 
bgneal@522:     return [int(n) for n in result]
bgneal@522: 
bgneal@522: 
bgneal@522: def get_latest_topics(num):
bgneal@522:     """
bgneal@522:     Return a list of dictionaries with information about the latest topics that
bgneal@522:     have updated posts. The topics are sorted from newest to oldest.
bgneal@522: 
bgneal@522:     """
bgneal@522:     try:
bgneal@523:         conn = get_redis_connection()
bgneal@523:         result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
bgneal@522: 
bgneal@522:         topic_keys = [UPDATED_TOPIC_KEY % n for n in result]
bgneal@524:         json_list = conn.mget(topic_keys) if topic_keys else []
bgneal@522: 
bgneal@522:     except redis.RedisError, e:
bgneal@522:         logger.error(e)
bgneal@522:         return []
bgneal@522: 
bgneal@522:     topics = []
bgneal@522:     for s in json_list:
bgneal@679:         item = json.loads(s)
bgneal@522:         item['date'] = datetime.datetime.fromtimestamp(item['date'])
bgneal@522:         topics.append(item)
bgneal@522: 
bgneal@522:     return topics
bgneal@522: 
bgneal@522: 
bgneal@522: def notify_topic_delete(topic):
bgneal@522:     """
bgneal@522:     This function should be called when a topic is deleted. It will remove the
bgneal@522:     topic from the updated topics set, if present, and delete any info we have
bgneal@522:     about the topic.
bgneal@522: 
bgneal@522:     Note we don't do anything like this for posts. Since they just populate RSS
bgneal@522:     feeds we'll let them 404. The updated topic list is seen in a prominent
bgneal@522:     template tag however, so it is a bit more important to get that cleaned up.
bgneal@522: 
bgneal@522:     """
bgneal@522:     try:
bgneal@523:         conn = get_redis_connection()
bgneal@523:         pipeline = conn.pipeline()
bgneal@522:         pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id)
bgneal@522:         pipeline.delete(UPDATED_TOPIC_KEY % topic.id)
bgneal@522:         pipeline.execute()
bgneal@522:     except redis.RedisError, e:
bgneal@522:         logger.error(e)
bgneal@522: 
bgneal@522: 
bgneal@595: def _serialize_post(post):
bgneal@595:     """Serialize a post to JSON and return it.
bgneal@595: 
bgneal@595:     """
bgneal@595:     # get any attachments for the post
bgneal@595: 
bgneal@595:     attachments = Attachment.objects.filter(post=post).select_related(
bgneal@595:             'embed').order_by('order')
bgneal@595:     embeds = [item.embed for item in attachments]
bgneal@595:     if len(embeds) == 0:
bgneal@595:         content = post.html
bgneal@595:     else:
bgneal@595:         content = render_to_string('forums/post_rss.html', {
bgneal@595:             'post': post,
bgneal@595:             'embeds': embeds,
bgneal@595:         })
bgneal@595: 
bgneal@595:     # serialize post attributes
bgneal@595:     post_content = {
bgneal@595:         'id': post.id,
bgneal@595:         'title': post.topic.name,
bgneal@595:         'content': content,
bgneal@595:         'author': post.user.username,
bgneal@595:         'pubdate': int(time.mktime(post.creation_date.timetuple())),
bgneal@595:         'forum_name': post.topic.forum.name,
bgneal@595:         'url': post.get_absolute_url()
bgneal@595:     }
bgneal@595: 
bgneal@679:     return json.dumps(post_content)
bgneal@595: 
bgneal@595: 
bgneal@522: # Down here to avoid a circular import
bgneal@522: import forums.tasks