Mercurial > public > sg101
comparison gpp/forums/latest.py @ 522:82b97697312e
Created Celery tasks to process new posts and topics. Keep the updated topic set in Redis.
This is for tickets #194, #237, #239.
author | Brian Neal <bgneal@gmail.com> |
---|---|
date | Sun, 18 Dec 2011 23:46:52 +0000 |
parents | 248dd8dd67f8 |
children | e9c446a64423 |
comparison
equal
deleted
inserted
replaced
521:dd14ab08a9c4 | 522:82b97697312e |
---|---|
3 needed by RSS feeds, "latest posts" template tags, etc. This module listens for | 3 needed by RSS feeds, "latest posts" template tags, etc. This module listens for |
4 the post_content_update signal, then bundles the post up and stores it by forum | 4 the post_content_update signal, then bundles the post up and stores it by forum |
5 ID in Redis. We also maintain a combined forums list. This allows quick | 5 ID in Redis. We also maintain a combined forums list. This allows quick |
6 retrieval of the latest posts and avoids some slow SQL queries. | 6 retrieval of the latest posts and avoids some slow SQL queries. |
7 | 7 |
8 We also do things like send topic notification emails, auto-favorite, and | |
9 auto-subscribe functions here rather than bog the user down in the request / | |
10 response cycle. | |
11 | |
8 """ | 12 """ |
9 import datetime | 13 import datetime |
14 import logging | |
10 import time | 15 import time |
11 | 16 |
12 from django.dispatch import receiver | 17 from django.dispatch import receiver |
13 from django.utils import simplejson | 18 from django.utils import simplejson |
14 | 19 |
15 from forums.signals import post_content_update | 20 from forums.signals import post_content_update, topic_content_update |
16 from forums.models import Forum | 21 from forums.models import Forum, Topic, Post |
22 from forums.views.subscriptions import notify_topic_subscribers | |
23 from forums.tools import auto_favorite, auto_subscribe | |
17 from core.services import get_redis_connection | 24 from core.services import get_redis_connection |
18 | |
19 | 25 |
20 # This constant controls how many latest posts per forum we store | 26 # This constant controls how many latest posts per forum we store |
21 MAX_POSTS = 50 | 27 MAX_POSTS = 50 |
22 | 28 |
29 # This controls how many updated topics we track | |
30 MAX_UPDATED_TOPICS = 50 | |
31 | |
32 # Redis key names: | |
33 POST_COUNT_KEY = "forums:public_post_count" | |
34 TOPIC_COUNT_KEY = "forums:public_topic_count" | |
35 UPDATED_TOPICS_SET_KEY = "forums:updated_topics:set" | |
36 UPDATED_TOPIC_KEY = "forums:updated_topics:%s" | |
37 | |
38 logger = logging.getLogger(__name__) | |
39 | |
23 | 40 |
24 @receiver(post_content_update, dispatch_uid='forums.latest_posts') | 41 @receiver(post_content_update, dispatch_uid='forums.latest_posts') |
25 def on_post_update(sender, **kwargs): | 42 def on_post_update(sender, **kwargs): |
26 """ | 43 """ |
27 This function is our signal handler, called when a post has been updated. | 44 This function is our signal handler, called when a post has been updated. |
28 We only care about newly created posts, and ignore updates. | 45 We only care about newly created posts, and ignore updates. |
29 | 46 |
30 We serialize a post to JSON then store in two lists in Redis: | 47 We kick off a Celery task to perform work outside of the request/response |
31 1. The list for the post's parent forum | 48 cycle. |
32 2. The combined forum list | |
33 | |
34 Note that we only store posts from public forums. | |
35 | 49 |
36 """ | 50 """ |
37 # ignore non-new posts | 51 # ignore non-new posts |
38 if not kwargs['created']: | 52 if not kwargs['created']: |
39 return | 53 return |
40 | 54 |
41 # ignore posts from non-public forums | 55 # Kick off a Celery task to process this new post |
56 forums.tasks.new_post_task.delay(sender.id) | |
57 | |
58 | |
59 def process_new_post(post_id): | |
60 """ | |
61 This function is run on a Celery task. It performs all new-post processing. | |
62 | |
63 """ | |
64 try: | |
65 post = Post.objects.select_related().get(pk=post_id) | |
66 except Post.DoesNotExist: | |
67 logger.warning("process_new_post: post %d does not exist", post_id) | |
68 return | |
69 | |
70 # selectively process posts from non-public forums | |
42 public_forums = Forum.objects.public_forum_ids() | 71 public_forums = Forum.objects.public_forum_ids() |
43 if sender.topic.forum.id not in public_forums: | 72 |
44 return | 73 if post.topic.forum.id in public_forums: |
45 | 74 redis = get_redis_connection() |
75 _update_post_feeds(redis, post) | |
76 _update_post_count(redis, public_forums) | |
77 _update_latest_topics(redis, post) | |
78 | |
79 # send out any email notifications | |
80 notify_topic_subscribers(post, defer=False) | |
81 | |
82 # perform any auto-favorite and auto-subscribe actions for the new post | |
83 auto_favorite(post) | |
84 auto_subscribe(post) | |
85 | |
86 | |
87 def _update_post_feeds(redis, post): | |
88 """ | |
89 Updates the forum feeds we keep in Redis so that our RSS feeds are quick. | |
90 | |
91 """ | |
46 # serialize post attributes | 92 # serialize post attributes |
47 post_content = { | 93 post_content = { |
48 'id': sender.id, | 94 'id': post.id, |
49 'title': sender.topic.name, | 95 'title': post.topic.name, |
50 'content': sender.html, | 96 'content': post.html, |
51 'author': sender.user.username, | 97 'author': post.user.username, |
52 'pubdate': int(time.mktime(sender.creation_date.timetuple())), | 98 'pubdate': int(time.mktime(post.creation_date.timetuple())), |
53 'forum_name': sender.topic.forum.name, | 99 'forum_name': post.topic.forum.name, |
54 'url': sender.get_absolute_url() | 100 'url': post.get_absolute_url() |
55 } | 101 } |
56 | 102 |
57 s = simplejson.dumps(post_content) | 103 s = simplejson.dumps(post_content) |
58 | 104 |
59 # store in Redis | 105 # store in Redis |
60 | 106 |
61 redis = get_redis_connection() | |
62 pipeline = redis.pipeline() | 107 pipeline = redis.pipeline() |
63 | 108 |
64 key = 'forums:latest:%d' % sender.topic.forum.id | 109 key = 'forums:latest:%d' % post.topic.forum.id |
65 | 110 |
66 pipeline.lpush(key, s) | 111 pipeline.lpush(key, s) |
67 pipeline.ltrim(key, 0, MAX_POSTS - 1) | 112 pipeline.ltrim(key, 0, MAX_POSTS - 1) |
68 | 113 |
69 # store in the combined feed; yes this wastes some memory storing it twice, | 114 # store in the combined feed; yes this wastes some memory storing it twice, |
73 | 118 |
74 pipeline.lpush(key, s) | 119 pipeline.lpush(key, s) |
75 pipeline.ltrim(key, 0, MAX_POSTS - 1) | 120 pipeline.ltrim(key, 0, MAX_POSTS - 1) |
76 | 121 |
77 pipeline.execute() | 122 pipeline.execute() |
123 | |
124 | |
125 def _update_post_count(redis, public_forums): | |
126 """ | |
127 Updates the post count we cache in Redis. Doing a COUNT(*) on the post table | |
128 can be expensive in MySQL InnoDB. | |
129 | |
130 """ | |
131 result = redis.incr(POST_COUNT_KEY) | |
132 if result == 1: | |
133 # it is likely redis got trashed, so re-compute the correct value | |
134 | |
135 count = Post.objects.filter(topic__forum__in=public_forums).count() | |
136 redis.set(POST_COUNT_KEY, count) | |
137 | |
138 | |
139 def _update_latest_topics(redis, post): | |
140 """ | |
141 Updates the "latest topics with new posts" list we cache in Redis for speed. | |
142 There is a template tag and forum view that uses this information. | |
143 | |
144 """ | |
145 # serialize topic attributes | |
146 topic_id = post.topic.id | |
147 topic_score = int(time.mktime(post.creation_date.timetuple())) | |
148 | |
149 topic_content = { | |
150 'title': post.topic.name, | |
151 'author': post.user.username, | |
152 'date': topic_score, | |
153 'url': post.get_absolute_url() | |
154 } | |
155 json = simplejson.dumps(topic_content) | |
156 key = UPDATED_TOPIC_KEY % topic_id | |
157 | |
158 pipeline = redis.pipeline() | |
159 pipeline.set(key, json) | |
160 pipeline.zadd(UPDATED_TOPICS_SET_KEY, topic_score, topic_id) | |
161 pipeline.zcard(UPDATED_TOPICS_SET_KEY) | |
162 results = pipeline.execute() | |
163 | |
164 # delete topics beyond our maximum count | |
165 num_topics = results[-1] | |
166 num_to_del = num_topics - MAX_UPDATED_TOPICS | |
167 if num_to_del > 0: | |
168 # get the IDs of the topics we need to delete first | |
169 start = 0 | |
170 stop = num_to_del - 1 # Redis indices are inclusive | |
171 old_ids = redis.zrange(UPDATED_TOPICS_SET_KEY, start, stop) | |
172 | |
173 keys = [UPDATED_TOPIC_KEY % n for n in old_ids] | |
174 redis.delete(*keys) | |
175 | |
176 # now delete the oldest num_to_del topics | |
177 redis.zremrangebyrank(UPDATED_TOPICS_SET_KEY, start, stop) | |
78 | 178 |
79 | 179 |
80 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None): | 180 def get_latest_posts(num_posts=MAX_POSTS, forum_id=None): |
81 """ | 181 """ |
82 This function retrieves num_posts latest posts for the forum with the given | 182 This function retrieves num_posts latest posts for the forum with the given |
103 post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate']) | 203 post['pubdate'] = datetime.datetime.fromtimestamp(post['pubdate']) |
104 | 204 |
105 posts.append(post) | 205 posts.append(post) |
106 | 206 |
107 return posts | 207 return posts |
208 | |
209 | |
210 @receiver(topic_content_update, dispatch_uid='forums.latest_posts') | |
211 def on_topic_update(sender, **kwargs): | |
212 """ | |
213 This function is our signal handler, called when a topic has been updated. | |
214 We only care about newly created topics, and ignore updates. | |
215 | |
216 We kick off a Celery task to perform work outside of the request/response | |
217 cycle. | |
218 | |
219 """ | |
220 # ignore non-new topics | |
221 if not kwargs['created']: | |
222 return | |
223 | |
224 # Kick off a Celery task to process this new post | |
225 forums.tasks.new_topic_task.delay(sender.id) | |
226 | |
227 | |
228 def process_new_topic(topic_id): | |
229 """ | |
230 This function contains new topic processing. Currently we only update the | |
231 topic count statistic. | |
232 | |
233 """ | |
234 try: | |
235 topic = Topic.objects.select_related().get(pk=topic_id) | |
236 except Topic.DoesNotExist: | |
237 logger.warning("process_new_topic: topic %d does not exist", topic_id) | |
238 return | |
239 | |
240 # selectively process topics from non-public forums | |
241 public_forums = Forum.objects.public_forum_ids() | |
242 | |
243 if topic.forum.id not in public_forums: | |
244 return | |
245 | |
246 # update the topic count statistic | |
247 redis = get_redis_connection() | |
248 | |
249 result = redis.incr(TOPIC_COUNT_KEY) | |
250 if result == 1: | |
251 # it is likely redis got trashed, so re-compute the correct value | |
252 | |
253 count = Topic.objects.filter(forum__in=public_forums).count() | |
254 redis.set(TOPIC_COUNT_KEY, count) | |
255 | |
256 | |
257 def get_stats(): | |
258 """ | |
259 This function returns the topic and post count statistics as a tuple, in | |
260 that order. If a statistic is not available, its position in the tuple will | |
261 be None. | |
262 | |
263 """ | |
264 try: | |
265 redis = get_redis_connection() | |
266 result = redis.mget(TOPIC_COUNT_KEY, POST_COUNT_KEY) | |
267 except redis.RedisError, e: | |
268 logger.error(e) | |
269 return (None, None) | |
270 | |
271 topic_count = int(result[0]) if result[0] else None | |
272 post_count = int(result[1]) if result[1] else None | |
273 | |
274 return (topic_count, post_count) | |
275 | |
276 | |
277 def get_latest_topic_ids(num): | |
278 """ | |
279 Return a list of topic ids from the latest topics that have posts. The ids | |
280 will be sorted from newest to oldest. | |
281 | |
282 """ | |
283 try: | |
284 redis = get_redis_connection() | |
285 result = redis.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1) | |
286 except redis.RedisError, e: | |
287 logger.error(e) | |
288 return [] | |
289 | |
290 return [int(n) for n in result] | |
291 | |
292 | |
293 def get_latest_topics(num): | |
294 """ | |
295 Return a list of dictionaries with information about the latest topics that | |
296 have updated posts. The topics are sorted from newest to oldest. | |
297 | |
298 """ | |
299 try: | |
300 redis = get_redis_connection() | |
301 result = redis.zrevrange(UPDATED_TOPICS_SET_KEY, 0, num - 1) | |
302 | |
303 topic_keys = [UPDATED_TOPIC_KEY % n for n in result] | |
304 json_list = redis.mget(topic_keys) | |
305 | |
306 except redis.RedisError, e: | |
307 logger.error(e) | |
308 return [] | |
309 | |
310 topics = [] | |
311 for s in json_list: | |
312 item = simplejson.loads(s) | |
313 item['date'] = datetime.datetime.fromtimestamp(item['date']) | |
314 topics.append(item) | |
315 | |
316 return topics | |
317 | |
318 | |
319 def notify_topic_delete(topic): | |
320 """ | |
321 This function should be called when a topic is deleted. It will remove the | |
322 topic from the updated topics set, if present, and delete any info we have | |
323 about the topic. | |
324 | |
325 Note we don't do anything like this for posts. Since they just populate RSS | |
326 feeds we'll let them 404. The updated topic list is seen in a prominent | |
327 template tag however, so it is a bit more important to get that cleaned up. | |
328 | |
329 """ | |
330 try: | |
331 redis = get_redis_connection() | |
332 pipeline = redis.pipeline() | |
333 pipeline.zrem(UPDATED_TOPICS_SET_KEY, topic.id) | |
334 pipeline.delete(UPDATED_TOPIC_KEY % topic.id) | |
335 pipeline.execute() | |
336 except redis.RedisError, e: | |
337 logger.error(e) | |
338 | |
339 | |
340 # Down here to avoid a circular import | |
341 import forums.tasks |