comparison gpp/forums/latest.py @ 522:82b97697312e

Created Celery tasks to process new posts and topics. Keep the updated topic set in Redis. This is for tickets #194, #237, #239.
author Brian Neal <bgneal@gmail.com>
date Sun, 18 Dec 2011 23:46:52 +0000
parents 248dd8dd67f8
children e9c446a64423
comparison
equal deleted inserted replaced
521:dd14ab08a9c4 522:82b97697312e
3 needed by RSS feeds, "latest posts" template tags, etc. This module listens for 3 needed by RSS feeds, "latest posts" template tags, etc. This module listens for
4 the post_content_update signal, then bundles the post up and stores it by forum 4 the post_content_update signal, then bundles the post up and stores it by forum
5 ID in Redis. We also maintain a combined forums list. This allows quick 5 ID in Redis. We also maintain a combined forums list. This allows quick
6 retrieval of the latest posts and avoids some slow SQL queries. 6 retrieval of the latest posts and avoids some slow SQL queries.
7 7
8 We also do things like send topic notification emails, auto-favorite, and
9 auto-subscribe functions here rather than bog the user down in the request /
10 response cycle.
11
8 """ 12 """
9 import datetime 13 import datetime
14 import logging
10 import time 15 import time
11 16
12 from django.dispatch import receiver 17 from django.dispatch import receiver
13 from django.utils import simplejson 18 from django.utils import simplejson
14 19
15 from forums.signals import post_content_update 20 from forums.signals import post_content_update, topic_content_update
16 from forums.models import Forum 21 from forums.models import Forum, Topic, Post
22 from forums.views.subscriptions import notify_topic_subscribers
23 from forums.tools import auto_favorite, auto_subscribe
17 from core.services import get_redis_connection 24 from core.services import get_redis_connection
18
19 25
20 # This constant controls how many latest posts per forum we store 26 # This constant controls how many latest posts per forum we store
21 MAX_POSTS = 50 27 MAX_POSTS = 50
22 28
29 # This controls how many updated topics we track
30 MAX_UPDATED_TOPICS = 50
31
32 # Redis key names:
33 POST_COUNT_KEY = "forums:public_post_count"
34 TOPIC_COUNT_KEY = "forums:public_topic_count"
35 UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set"
36 UPDATED_TOPIC_KEY = "forums:updated_topics:%s"
37
38 logger = logging.getLogger(__name__)
39
23 40
24 @receiver(post_content_update, dispatch_uid='forums.latest_posts') 41 @receiver(post_content_update, dispatch_uid='forums.latest_posts')
25 def on_post_update(sender, **kwargs): 42 def on_post_update(sender, **kwargs):
26 """ 43 """
27 This function is our signal handler, called when a post has been updated. 44 This function is our signal handler, called when a post has been updated.
28 We only care about newly created posts, and ignore updates. 45 We only care about newly created posts, and ignore updates.
29 46
30 We serialize a post to JSON then store in two lists in Redis: 47 We kick off a Celery task to perform work outside of the request/response
31 1. The list for the post's parent forum 48 cycle.
32 2. The combined forum list
33
34 Note that we only store posts from public forums.
35 49
36 """ 50 """
37 # ignore non-new posts 51 # ignore non-new posts
38 if not kwargs['created']: 52 if not kwargs['created']:
39 return 53 return
40 54
41 # ignore posts from non-public forums 55 # Kick off a Celery task to process this new post
56 forums.tasks.new_post_task.delay(sender.id)
57
58
59 def process_new_post(post_id):
60 """
61 This function is run on a Celery task. It performs all new-post processing.
62
63 """
64 try:
65 post = Post.objects.select_related().get(pk=post_id)
66 except Post.DoesNotExist:
67 logger.warning("process_new_post: post %d does not exist", post_id)
68 return
69
70 # selectively process posts from non-public forums
42 public_forums = Forum.objects.public_forum_ids() 71 public_forums = Forum.objects.public_forum_ids()
43 if sender.topic.forum.id not in public_forums: 72
44 return 73 if post.topic.forum.id in public_forums:
45 74 redis = get_redis_connection()
75 _update_post_feeds(redis, post)
76 _update_post_count(redis, public_forums)
77 _update_latest_topics(redis, post)
78
79 # send out any email notifications
80 notify_topic_subscribers(post, defer=False)
81
82 # perform any auto-favorite and auto-subscribe actions for the new post
83 auto_favorite(post)
84 auto_subscribe(post)
85
86
87 def _update_post_feeds(redis, post):
88 """
89 Updates the forum feeds we keep in Redis so that our RSS feeds are quick.
90
91 """
46 # serialize post attributes 92 # serialize post attributes
47 post_content = { 93 post_content = {
48 'id': sender.id, 94 'id': post.id,
49 'title': sender.topic.name, 95 'title': post.topic.name,
50 'content': sender.html, 96 'content': post.html,
51 'author': sender.user.username, 97 'author': post.user.username,
52 'pubdate': int(time.mktime(sender.creation_date.timetuple())), 98 'pubdate': int(time.mktime(post.creation_date.timetuple())),
53 'forum_name': sender.topic.forum.name, 99 'forum_name': post.topic.forum.name,
54 'url': sender.get_absolute_url() 100 'url': post.get_absolute_url()
55 } 101 }
56 102
57 s = simplejson.dumps(post_content) 103 s = simplejson.dumps(post_content)
58 104
59 # store in Redis 105 # store in Redis
60 106
61 redis = get_redis_connection()
62 pipeline = redis.pipeline() 107 pipeline = redis.pipeline()
63 108
64 key = 'forums:latest:%d' % sender.topic.forum.id 109 key = 'forums:latest:%d' % post.topic.forum.id
65 110
66 pipeline.lpush(key, s) 111 pipeline.lpush(key, s)
67 pipeline.ltrim(key, 0, MAX_POSTS - 1) 112 pipeline.ltrim(key, 0, MAX_POSTS - 1)
68 113
69 # store in the combined feed; yes this wastes some memory storing it twice, 114 # store in the combined feed; yes this wastes some memory storing it twice,
73 118
74 pipeline.lpush(key, s) 119 pipeline.lpush(key, s)
75 pipeline.ltrim(key, 0, MAX_POSTS - 1) 120 pipeline.ltrim(key, 0, MAX_POSTS - 1)
76 121
77 pipeline.execute() 122 pipeline.execute()
123
124
125 def _update_post_count(redis, public_forums):
126 """
127 Updates the post count we cache in Redis. Doing a COUNT(*) on the post table
128 can be expensive in MySQL InnoDB.
129
130 """
131 result = redis.incr(POST_COUNT_KEY)
132 if result == 1:
133 # it is likely redis got trashed, so re-compute the correct value
134
135 count = Post.objects.filter(topic__forum__in=public_forums).count()
136 redis.set(POST_COUNT_KEY, count)
137
138
139 def _update_latest_topics(redis, post):
140 """
141 Updates the "latest topics with new posts" list we cache in Redis for speed.
142 There is a template tag and forum view that uses this information.
143
144 """
145 # serialize topic attributes
146 topic_id = post.topic.id
147 topic_score = int(time.mktime(post.creation_date.timetuple()))
148
149 topic_content = {
150 'title': post.topic.name,
151 'author': post.user.username,
152 'date': topic_score,
153 'url': post.get_absolute_url()
154 }
155 json = simplejson.dumps(topic_content)
156 key = UPDATED_TOPIC_KEY % topic_id
157
158 pipeline = redis.pipeline()
159 pipeline.set(key, json)
160 pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id)
161 pipeline.zcard(UPDATED_TOPICS_SET_KEY)
162 results = pipeline.execute()
163
164 # delete topics beyond our maximum count
165 num_topics = results[-1]
166 num_to_del = num_topics - MAX_UPDATED_TOPICS
167 if num_to_del > 0:
168 # get the IDs of the topics we need to delete first
169 start = 0
170 stop = num_to_del - 1 # Redis indices are inclusive
171 old_ids = redis.zrange(UPDATED_TOPICS_SET_KEY, start, stop)
172
173 keys = [UPDATED_TOPIC_KEY % n for n in old_ids]
174 redis.delete(*keys)
175
176 # now delete the oldest num_to_del topics
177 redis.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop)
78 178
79 179
80 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None): 180 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None):
81 """ 181 """
82 This function retrieves num_posts latest posts for the forum with the given 182 This function retrieves num_posts latest posts for the forum with the given
103 post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate']) 203 post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate'])
104 204
105 posts.append(post) 205 posts.append(post)
106 206
107 return posts 207 return posts
208
209
210 @receiver(topic_content_update, dispatch_uid='forums.latest_posts')
211 def on_topic_update(sender, **kwargs):
212 """
213 This function is our signal handler, called when a topic has been updated.
214 We only care about newly created topics, and ignore updates.
215
216 We kick off a Celery task to perform work outside of the request/response
217 cycle.
218
219 """
220 # ignore non-new topics
221 if not kwargs['created']:
222 return
223
224 # Kick off a Celery task to process this new post
225 forums.tasks.new_topic_task.delay(sender.id)
226
227
228 def process_new_topic(topic_id):
229 """
230 This function contains new topic processing. Currently we only update the
231 topic count statistic.
232
233 """
234 try:
235 topic = Topic.objects.select_related().get(pk=topic_id)
236 except Topic.DoesNotExist:
237 logger.warning("process_new_topic: topic %d does not exist", topic_id)
238 return
239
240 # selectively process topics from non-public forums
241 public_forums = Forum.objects.public_forum_ids()
242
243 if topic.forum.id not in public_forums:
244 return
245
246 # update the topic count statistic
247 redis = get_redis_connection()
248
249 result = redis.incr(TOPIC_COUNT_KEY)
250 if result == 1:
251 # it is likely redis got trashed, so re-compute the correct value
252
253 count = Topic.objects.filter(forum__in=public_forums).count()
254 redis.set(TOPIC_COUNT_KEY, count)
255
256
257 def get_stats():
258 """
259 This function returns the topic and post count statistics as a tuple, in
260 that order. If a statistic is not available, its position in the tuple will
261 be None.
262
263 """
264 try:
265 redis = get_redis_connection()
266 result = redis.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY)
267 except redis.RedisError, e:
268 logger.error(e)
269 return (None, None)
270
271 topic_count = int(result[0]) if result[0] else None
272 post_count = int(result[1]) if result[1] else None
273
274 return (topic_count, post_count)
275
276
277 def get_latest_topic_ids(num):
278 """
279 Return a list of topic ids from the latest topics that have posts. The ids
280 will be sorted from newest to oldest.
281
282 """
283 try:
284 redis = get_redis_connection()
285 result = redis.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
286 except redis.RedisError, e:
287 logger.error(e)
288 return []
289
290 return [int(n) for n in result]
291
292
293 def get_latest_topics(num):
294 """
295 Return a list of dictionaries with information about the latest topics that
296 have updated posts. The topics are sorted from newest to oldest.
297
298 """
299 try:
300 redis = get_redis_connection()
301 result = redis.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
302
303 topic_keys = [UPDATED_TOPIC_KEY % n for n in result]
304 json_list = redis.mget(topic_keys)
305
306 except redis.RedisError, e:
307 logger.error(e)
308 return []
309
310 topics = []
311 for s in json_list:
312 item = simplejson.loads(s)
313 item['date'] = datetime.datetime.fromtimestamp(item['date'])
314 topics.append(item)
315
316 return topics
317
318
319 def notify_topic_delete(topic):
320 """
321 This function should be called when a topic is deleted. It will remove the
322 topic from the updated topics set, if present, and delete any info we have
323 about the topic.
324
325 Note we don't do anything like this for posts. Since they just populate RSS
326 feeds we'll let them 404. The updated topic list is seen in a prominent
327 template tag however, so it is a bit more important to get that cleaned up.
328
329 """
330 try:
331 redis = get_redis_connection()
332 pipeline = redis.pipeline()
333 pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id)
334 pipeline.delete(UPDATED_TOPIC_KEY % topic.id)
335 pipeline.execute()
336 except redis.RedisError, e:
337 logger.error(e)
338
339
340 # Down here to avoid a circular import
341 import forums.tasks