comparison forums/latest.py @ 595:f3fded5df64b

Forum topic & post updates now affect the RSS feed data in Redis. This is for bitbucket issue #10.
author Brian Neal <bgneal@gmail.com>
date Thu, 24 May 2012 15:49:15 -0500
parents 2469d5864249
children 89b240fe9297
comparison
equal deleted inserted replaced
594:2469d5864249 595:f3fded5df64b
8 We also do things like send topic notification emails, auto-favorite, and 8 We also do things like send topic notification emails, auto-favorite, and
9 auto-subscribe functions here rather than bog the user down in the request / 9 auto-subscribe functions here rather than bog the user down in the request /
10 response cycle. 10 response cycle.
11 11
12 """ 12 """
13 # Maintenance notes:
14 # How we use Redis in this module:
15 #
16 # Forum post processing:
17 #
18 # * Forum posts are turned into Python dictionaries, then converted to JSON and
19 # stored under keys: forums:post:id
20 # * Each forum has a list in Redis stored under the key: forums:rss:id. This
21 # is a list of post IDs.
22 # * There is also a key called forums:rss:* which is the combined latest
23 # feed. It is also a list of post IDs.
24 # * A sorted set is maintained that keeps track of the reference count for each
25 # post. When a new post is created, this reference count is 2 because it is
26 # stored in both the combined list and the parent forum list.
27 # This sorted set is stored under the key: forums:post_ref_cnt.
28 # * When a post falls off a list due to aging, the reference count in the
29 # ordered set is decremented. If it falls to zero, the post's key is deleted
30 # from Redis.
31 # * When a post is edited, and it is in Redis, we simply update the JSON
32 # content.
33 # * When a post is deleted, and it is in Redis, it is removed from the 2 lists,
34 # the ordered set, and deleted from Redis.
35 # * When the RSS feed wants to update, it simply pulls down the entire list of
36 # post IDs for the feed of interest, then does a get on all the posts.
37 #
38 # Topics with recent posts processing:
39 #
40 # * A key is created for each topic that is updated.
41 # * An ordered set of topics is maintained with the current time as the score.
42 # * An updated topic gets its score bumped.
43 # * We only allow MAX_UPDATED_TOPICS number of topics in the set. We sort the
44 # set by score, and the expired topics are removed from the set and their keys
45 # are deleted from Redis.
46 # * The template tag (or anyone) who wants the list of topics with new posts
47 # gets the list of IDs sorted by score from newest to oldest. An mget is then
48 # performed to get all the topic data and it is deserialized from JSON.
49 #
50 # We also maintain topic and post counts in Redis since select(*) can take a
51 # while with MySQL InnoDb.
52 #
13 import datetime 53 import datetime
14 import logging 54 import logging
15 import time 55 import time
16 56
17 from django.dispatch import receiver 57 from django.dispatch import receiver
34 # Redis key names: 74 # Redis key names:
35 POST_COUNT_KEY = "forums:public_post_count" 75 POST_COUNT_KEY = "forums:public_post_count"
36 TOPIC_COUNT_KEY = "forums:public_topic_count" 76 TOPIC_COUNT_KEY = "forums:public_topic_count"
37 UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set" 77 UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set"
38 UPDATED_TOPIC_KEY = "forums:updated_topics:%s" 78 UPDATED_TOPIC_KEY = "forums:updated_topics:%s"
79 POST_KEY = "forums:post:%s"
80 FORUM_RSS_KEY = "forums:rss:%s"
81 ALL_FORUMS_RSS_KEY = "forums:rss:*"
82 POST_SET_KEY = "forums:post_ref_cnt"
39 83
40 logger = logging.getLogger(__name__) 84 logger = logging.getLogger(__name__)
41 85
42 86
43 @receiver(post_content_update, dispatch_uid='forums.latest_posts') 87 @receiver(post_content_update, dispatch_uid='forums.latest_posts')
44 def on_post_update(sender, **kwargs): 88 def on_post_update(sender, **kwargs):
45 """ 89 """
46 This function is our signal handler, called when a post has been updated. 90 This function is our signal handler, called when a post has been updated
47 We only care about newly created posts, and ignore updates. 91 or created.
48 92
49 We kick off a Celery task to perform work outside of the request/response 93 We kick off a Celery task to perform work outside of the request/response
50 cycle. 94 cycle.
51 95
52 """ 96 """
53 # ignore non-new posts 97 if kwargs['created']:
54 if not kwargs['created']: 98 forums.tasks.new_post_task.delay(sender.id)
55 return 99 else:
56 100 forums.tasks.updated_post_task.delay(sender.id)
57 # Kick off a Celery task to process this new post
58 forums.tasks.new_post_task.delay(sender.id)
59 101
60 102
61 def process_new_post(post_id): 103 def process_new_post(post_id):
62 """ 104 """
63 This function is run on a Celery task. It performs all new-post processing. 105 This function is run on a Celery task. It performs all new-post processing.
84 # perform any auto-favorite and auto-subscribe actions for the new post 126 # perform any auto-favorite and auto-subscribe actions for the new post
85 auto_favorite(post) 127 auto_favorite(post)
86 auto_subscribe(post) 128 auto_subscribe(post)
87 129
88 130
131 def process_updated_post(post_id):
132 """
133 This function is run on a Celery task. It performs all updated-post
134 processing.
135
136 """
137 # Is this post ID in a RSS feed?
138 conn = get_redis_connection()
139 post_key = POST_KEY % post_id
140 post_val = conn.get(post_key)
141
142 if post_val is not None:
143 # Update the post value in Redis
144 try:
145 post = Post.objects.select_related().get(pk=post_id)
146 except Post.DoesNotExist:
147 logger.warning("process_updated_post: post %d does not exist", post_id)
148 return
149 conn.set(post_key, _serialize_post(post))
150
151
89 def _update_post_feeds(conn, post): 152 def _update_post_feeds(conn, post):
90 """ 153 """
91 Updates the forum feeds we keep in Redis so that our RSS feeds are quick. 154 Updates the forum feeds we keep in Redis so that our RSS feeds are quick.
155
156 """
157 post_key = POST_KEY % post.id
158 post_value = _serialize_post(post)
159
160 pipeline = conn.pipeline()
161
162 # Store serialized post content under its own key
163 pipeline.set(post_key, post_value)
164
165 # Store in the RSS feed for the post's forum
166 forum_key = FORUM_RSS_KEY % post.topic.forum.id
167 pipeline.lpush(forum_key, post.id)
168
169 # Store in the RSS feed for combined forums
170 pipeline.lpush(ALL_FORUMS_RSS_KEY, post.id)
171
172 # Store reference count for the post
173 pipeline.zadd(POST_SET_KEY, 2, post.id)
174
175 results = pipeline.execute()
176
177 # Make sure our forums RSS lists lengths are not exceeded
178
179 if results[1] > MAX_POSTS or results[2] > MAX_POSTS:
180 pipeline = conn.pipeline()
181
182 # Truncate lists of posts:
183 if results[1] > MAX_POSTS:
184 pipeline.rpop(forum_key)
185 if results[2] > MAX_POSTS:
186 pipeline.rpop(ALL_FORUMS_RSS_KEY)
187 post_ids = pipeline.execute()
188
189 # Decrement reference count(s)
190 pipeline = conn.pipeline()
191 for post_id in post_ids:
192 pipeline.zincrby(POST_SET_KEY, post_id, -1)
193 scores = pipeline.execute()
194
195 # If any reference counts have fallen to 0, clean up:
196 if not all(scores):
197 pipeline = conn.pipeline()
198
199 # remove from post set
200 ids = [post_ids[n] for n, s in enumerate(scores) if s <= 0.0]
201 pipeline.zrem(POST_SET_KEY, *ids)
202
203 # remove serialized post data
204 keys = [POST_KEY % n for n in ids]
205 pipeline.delete(*keys)
206
207 pipeline.execute()
208
209
210 def _update_post_count(conn, public_forums):
211 """
212 Updates the post count we cache in Redis. Doing a COUNT(*) on the post table
213 can be expensive in MySQL InnoDB.
214
215 """
216 result = conn.incr(POST_COUNT_KEY)
217 if result == 1:
218 # it is likely redis got trashed, so re-compute the correct value
219
220 count = Post.objects.filter(topic__forum__in=public_forums).count()
221 conn.set(POST_COUNT_KEY, count)
222
223
224 def _update_latest_topics(conn, post):
225 """
226 Updates the "latest topics with new posts" list we cache in Redis for speed.
227 There is a template tag and forum view that uses this information.
228
229 """
230 # serialize topic attributes
231 topic_id = post.topic.id
232 topic_score = int(time.mktime(post.creation_date.timetuple()))
233
234 topic_content = {
235 'title': post.topic.name,
236 'author': post.user.username,
237 'date': topic_score,
238 'url': post.topic.get_latest_post_url()
239 }
240 json = simplejson.dumps(topic_content)
241 key = UPDATED_TOPIC_KEY % topic_id
242
243 pipeline = conn.pipeline()
244 pipeline.set(key, json)
245 pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id)
246 pipeline.zcard(UPDATED_TOPICS_SET_KEY)
247 results = pipeline.execute()
248
249 # delete topics beyond our maximum count
250 num_topics = results[-1]
251 num_to_del = num_topics - MAX_UPDATED_TOPICS
252 if num_to_del > 0:
253 # get the IDs of the topics we need to delete first
254 start = 0
255 stop = num_to_del - 1 # Redis indices are inclusive
256 old_ids = conn.zrange(UPDATED_TOPICS_SET_KEY, start, stop)
257
258 keys = [UPDATED_TOPIC_KEY % n for n in old_ids]
259 conn.delete(*keys)
260
261 # now delete the oldest num_to_del topics
262 conn.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop)
263
264
265 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None):
266 """
267 This function retrieves num_posts latest posts for the forum with the given
268 forum_id. If forum_id is None, the posts are retrieved from the combined
269 forums datastore. A list of dictionaries is returned. Each dictionary
270 contains information about a post.
271
272 """
273 key = FORUM_RSS_KEY % forum_id if forum_id else ALL_FORUMS_RSS_KEY
274
275 num_posts = max(0, min(MAX_POSTS, num_posts))
276
277 if num_posts == 0:
278 return []
279
280 conn = get_redis_connection()
281 post_ids = conn.lrange(key, 0, num_posts - 1)
282 if not post_ids:
283 return []
284
285 post_keys = [POST_KEY % n for n in post_ids]
286 raw_posts = conn.mget(post_keys)
287 raw_posts = [s for s in raw_posts if s is not None]
288
289 posts = []
290 for raw_post in raw_posts:
291 post = simplejson.loads(raw_post)
292
293 # fix up the pubdate; turn it back into a datetime object
294 post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate'])
295
296 posts.append(post)
297
298 return posts
299
300
301 @receiver(topic_content_update, dispatch_uid='forums.latest_posts')
302 def on_topic_update(sender, **kwargs):
303 """
304 This function is our signal handler, called when a topic has been updated
305 or created.
306
307 We kick off a Celery task to perform work outside of the request/response
308 cycle.
309
310 """
311 if kwargs['created']:
312 forums.tasks.new_topic_task.delay(sender.id)
313 else:
314 forums.tasks.updated_topic_task.delay(sender.id)
315
316
317 def process_new_topic(topic_id):
318 """
319 This function contains new topic processing. Currently we only update the
320 topic count statistic.
321
322 """
323 try:
324 topic = Topic.objects.select_related().get(pk=topic_id)
325 except Topic.DoesNotExist:
326 logger.warning("process_new_topic: topic %d does not exist", topic_id)
327 return
328
329 # selectively process topics from non-public forums
330 public_forums = Forum.objects.public_forum_ids()
331
332 if topic.forum.id not in public_forums:
333 return
334
335 # update the topic count statistic
336 conn = get_redis_connection()
337
338 result = conn.incr(TOPIC_COUNT_KEY)
339 if result == 1:
340 # it is likely redis got trashed, so re-compute the correct value
341
342 count = Topic.objects.filter(forum__in=public_forums).count()
343 conn.set(TOPIC_COUNT_KEY, count)
344
345
346 def process_updated_topic(topic_id):
347 """
348 This function contains updated topic processing. Update the title only.
349
350 """
351 conn = get_redis_connection()
352 key = UPDATED_TOPIC_KEY % topic_id
353 json = conn.get(key)
354 if json is not None:
355 try:
356 topic = Topic.objects.get(pk=topic_id)
357 except Topic.DoesNotExist:
358 logger.warning("topic %d does not exist", topic_id)
359 return
360
361 topic_dict = simplejson.loads(json)
362
363 if topic.name != topic_dict['title']:
364 topic_dict['title'] = topic.name
365 json = simplejson.dumps(topic_dict)
366 conn.set(key, json)
367
368
369 def get_stats():
370 """
371 This function returns the topic and post count statistics as a tuple, in
372 that order. If a statistic is not available, its position in the tuple will
373 be None.
374
375 """
376 try:
377 conn = get_redis_connection()
378 result = conn.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY)
379 except redis.RedisError, e:
380 logger.error(e)
381 return (None, None)
382
383 topic_count = int(result[0]) if result[0] else None
384 post_count = int(result[1]) if result[1] else None
385
386 return (topic_count, post_count)
387
388
389 def get_latest_topic_ids(num):
390 """
391 Return a list of topic ids from the latest topics that have posts. The ids
392 will be sorted from newest to oldest.
393
394 """
395 try:
396 conn = get_redis_connection()
397 result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
398 except redis.RedisError, e:
399 logger.error(e)
400 return []
401
402 return [int(n) for n in result]
403
404
405 def get_latest_topics(num):
406 """
407 Return a list of dictionaries with information about the latest topics that
408 have updated posts. The topics are sorted from newest to oldest.
409
410 """
411 try:
412 conn = get_redis_connection()
413 result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
414
415 topic_keys = [UPDATED_TOPIC_KEY % n for n in result]
416 json_list = conn.mget(topic_keys) if topic_keys else []
417
418 except redis.RedisError, e:
419 logger.error(e)
420 return []
421
422 topics = []
423 for s in json_list:
424 item = simplejson.loads(s)
425 item['date'] = datetime.datetime.fromtimestamp(item['date'])
426 topics.append(item)
427
428 return topics
429
430
431 def notify_topic_delete(topic):
432 """
433 This function should be called when a topic is deleted. It will remove the
434 topic from the updated topics set, if present, and delete any info we have
435 about the topic.
436
437 Note we don't do anything like this for posts. Since they just populate RSS
438 feeds we'll let them 404. The updated topic list is seen in a prominent
439 template tag however, so it is a bit more important to get that cleaned up.
440
441 """
442 try:
443 conn = get_redis_connection()
444 pipeline = conn.pipeline()
445 pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id)
446 pipeline.delete(UPDATED_TOPIC_KEY % topic.id)
447 pipeline.execute()
448 except redis.RedisError, e:
449 logger.error(e)
450
451
452 def _serialize_post(post):
453 """Serialize a post to JSON and return it.
92 454
93 """ 455 """
94 # get any attachments for the post 456 # get any attachments for the post
95 457
96 attachments = Attachment.objects.filter(post=post).select_related( 458 attachments = Attachment.objects.filter(post=post).select_related(
113 'pubdate': int(time.mktime(post.creation_date.timetuple())), 475 'pubdate': int(time.mktime(post.creation_date.timetuple())),
114 'forum_name': post.topic.forum.name, 476 'forum_name': post.topic.forum.name,
115 'url': post.get_absolute_url() 477 'url': post.get_absolute_url()
116 } 478 }
117 479
118 s = simplejson.dumps(post_content) 480 return simplejson.dumps(post_content)
119
120 # store in Redis
121
122 pipeline = conn.pipeline()
123
124 key = 'forums:latest:%d' % post.topic.forum.id
125
126 pipeline.lpush(key, s)
127 pipeline.ltrim(key, 0, MAX_POSTS - 1)
128
129 # store in the combined feed; yes this wastes some memory storing it twice,
130 # but it makes things much easier
131
132 key = 'forums:latest:*'
133
134 pipeline.lpush(key, s)
135 pipeline.ltrim(key, 0, MAX_POSTS - 1)
136
137 pipeline.execute()
138
139
140 def _update_post_count(conn, public_forums):
141 """
142 Updates the post count we cache in Redis. Doing a COUNT(*) on the post table
143 can be expensive in MySQL InnoDB.
144
145 """
146 result = conn.incr(POST_COUNT_KEY)
147 if result == 1:
148 # it is likely redis got trashed, so re-compute the correct value
149
150 count = Post.objects.filter(topic__forum__in=public_forums).count()
151 conn.set(POST_COUNT_KEY, count)
152
153
154 def _update_latest_topics(conn, post):
155 """
156 Updates the "latest topics with new posts" list we cache in Redis for speed.
157 There is a template tag and forum view that uses this information.
158
159 """
160 # serialize topic attributes
161 topic_id = post.topic.id
162 topic_score = int(time.mktime(post.creation_date.timetuple()))
163
164 topic_content = {
165 'title': post.topic.name,
166 'author': post.user.username,
167 'date': topic_score,
168 'url': post.topic.get_latest_post_url()
169 }
170 json = simplejson.dumps(topic_content)
171 key = UPDATED_TOPIC_KEY % topic_id
172
173 pipeline = conn.pipeline()
174 pipeline.set(key, json)
175 pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id)
176 pipeline.zcard(UPDATED_TOPICS_SET_KEY)
177 results = pipeline.execute()
178
179 # delete topics beyond our maximum count
180 num_topics = results[-1]
181 num_to_del = num_topics - MAX_UPDATED_TOPICS
182 if num_to_del > 0:
183 # get the IDs of the topics we need to delete first
184 start = 0
185 stop = num_to_del - 1 # Redis indices are inclusive
186 old_ids = conn.zrange(UPDATED_TOPICS_SET_KEY, start, stop)
187
188 keys = [UPDATED_TOPIC_KEY % n for n in old_ids]
189 conn.delete(*keys)
190
191 # now delete the oldest num_to_del topics
192 conn.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop)
193
194
195 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None):
196 """
197 This function retrieves num_posts latest posts for the forum with the given
198 forum_id. If forum_id is None, the posts are retrieved from the combined
199 forums datastore. A list of dictionaries is returned. Each dictionary
200 contains information about a post.
201
202 """
203 key = 'forums:latest:%d' % forum_id if forum_id else 'forums:latest:*'
204
205 num_posts = max(0, min(MAX_POSTS, num_posts))
206
207 if num_posts == 0:
208 return []
209
210 conn = get_redis_connection()
211 raw_posts = conn.lrange(key, 0, num_posts - 1)
212
213 posts = []
214 for raw_post in raw_posts:
215 post = simplejson.loads(raw_post)
216
217 # fix up the pubdate; turn it back into a datetime object
218 post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate'])
219
220 posts.append(post)
221
222 return posts
223
224
225 @receiver(topic_content_update, dispatch_uid='forums.latest_posts')
226 def on_topic_update(sender, **kwargs):
227 """
228 This function is our signal handler, called when a topic has been updated.
229 We only care about newly created topics, and ignore updates.
230
231 We kick off a Celery task to perform work outside of the request/response
232 cycle.
233
234 """
235 # ignore non-new topics
236 if not kwargs['created']:
237 return
238
239 # Kick off a Celery task to process this new post
240 forums.tasks.new_topic_task.delay(sender.id)
241
242
243 def process_new_topic(topic_id):
244 """
245 This function contains new topic processing. Currently we only update the
246 topic count statistic.
247
248 """
249 try:
250 topic = Topic.objects.select_related().get(pk=topic_id)
251 except Topic.DoesNotExist:
252 logger.warning("process_new_topic: topic %d does not exist", topic_id)
253 return
254
255 # selectively process topics from non-public forums
256 public_forums = Forum.objects.public_forum_ids()
257
258 if topic.forum.id not in public_forums:
259 return
260
261 # update the topic count statistic
262 conn = get_redis_connection()
263
264 result = conn.incr(TOPIC_COUNT_KEY)
265 if result == 1:
266 # it is likely redis got trashed, so re-compute the correct value
267
268 count = Topic.objects.filter(forum__in=public_forums).count()
269 conn.set(TOPIC_COUNT_KEY, count)
270
271
272 def get_stats():
273 """
274 This function returns the topic and post count statistics as a tuple, in
275 that order. If a statistic is not available, its position in the tuple will
276 be None.
277
278 """
279 try:
280 conn = get_redis_connection()
281 result = conn.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY)
282 except redis.RedisError, e:
283 logger.error(e)
284 return (None, None)
285
286 topic_count = int(result[0]) if result[0] else None
287 post_count = int(result[1]) if result[1] else None
288
289 return (topic_count, post_count)
290
291
292 def get_latest_topic_ids(num):
293 """
294 Return a list of topic ids from the latest topics that have posts. The ids
295 will be sorted from newest to oldest.
296
297 """
298 try:
299 conn = get_redis_connection()
300 result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
301 except redis.RedisError, e:
302 logger.error(e)
303 return []
304
305 return [int(n) for n in result]
306
307
308 def get_latest_topics(num):
309 """
310 Return a list of dictionaries with information about the latest topics that
311 have updated posts. The topics are sorted from newest to oldest.
312
313 """
314 try:
315 conn = get_redis_connection()
316 result = conn.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1)
317
318 topic_keys = [UPDATED_TOPIC_KEY % n for n in result]
319 json_list = conn.mget(topic_keys) if topic_keys else []
320
321 except redis.RedisError, e:
322 logger.error(e)
323 return []
324
325 topics = []
326 for s in json_list:
327 item = simplejson.loads(s)
328 item['date'] = datetime.datetime.fromtimestamp(item['date'])
329 topics.append(item)
330
331 return topics
332
333
334 def notify_topic_delete(topic):
335 """
336 This function should be called when a topic is deleted. It will remove the
337 topic from the updated topics set, if present, and delete any info we have
338 about the topic.
339
340 Note we don't do anything like this for posts. Since they just populate RSS
341 feeds we'll let them 404. The updated topic list is seen in a prominent
342 template tag however, so it is a bit more important to get that cleaned up.
343
344 """
345 try:
346 conn = get_redis_connection()
347 pipeline = conn.pipeline()
348 pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id)
349 pipeline.delete(UPDATED_TOPIC_KEY % topic.id)
350 pipeline.execute()
351 except redis.RedisError, e:
352 logger.error(e)
353 481
354 482
355 # Down here to avoid a circular import 483 # Down here to avoid a circular import
356 import forums.tasks 484 import forums.tasks