# HG changeset patch # User Brian Neal # Date 1330888840 21600 # Node ID 6a265b5768ca00f2c884c14ce94d813ffc48aa2a # Parent e5d3552d4ad0ca6924867020a1160bf0097cbe14 For bitbucket issue #5, rework the duplicate email checking in the registration form logic. Added tests for registration. diff -r e5d3552d4ad0 -r 6a265b5768ca gpp/accounts/forms.py --- a/gpp/accounts/forms.py Mon Feb 13 19:34:29 2012 -0600 +++ b/gpp/accounts/forms.py Sun Mar 04 13:20:40 2012 -0600 @@ -58,13 +58,13 @@ def clean_username(self): username = self.cleaned_data['username'] try: - User.objects.get(username = username) + User.objects.get(username=username) except User.DoesNotExist: try: - PendingUser.objects.get(username = username) + PendingUser.objects.get(username=username) except PendingUser.DoesNotExist: try: - IllegalUsername.objects.get(username = username) + IllegalUsername.objects.get(username=username) except IllegalUsername.DoesNotExist: return username self._validation_error("That username is not allowed.", username) @@ -73,19 +73,16 @@ def clean_email(self): email = self.cleaned_data['email'] - try: - User.objects.get(email = email) - except User.DoesNotExist: - try: - PendingUser.objects.get(email = email) - except PendingUser.DoesNotExist: - try: - IllegalEmail.objects.get(email = email) - except IllegalEmail.DoesNotExist: - return email - self._validation_error("That email address is not allowed.", email) + + if User.objects.filter(email=email).count(): + self._validation_error("A user with that email address already exists.", email) + elif PendingUser.objects.filter(email=email).count(): self._validation_error("A pending user with that email address already exists.", email) - self._validation_error("A user with that email address already exists.", email) + elif IllegalEmail.objects.filter(email=email).count(): + self._validation_error("That email address is not allowed.", email) + + # email is ok + return email def clean_password2(self): password1 = self.cleaned_data.get("password1", "") diff -r e5d3552d4ad0 -r 6a265b5768ca gpp/accounts/tests/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gpp/accounts/tests/__init__.py Sun Mar 04 13:20:40 2012 -0600 @@ -0,0 +1,1 @@ +from view_tests import * diff -r e5d3552d4ad0 -r 6a265b5768ca gpp/accounts/tests/view_tests.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gpp/accounts/tests/view_tests.py Sun Mar 04 13:20:40 2012 -0600 @@ -0,0 +1,253 @@ +""" +View tests for the accounts application. + +""" +import datetime + +from django.test import TestCase +from django.core.urlresolvers import reverse +from django.contrib.auth.models import User, check_password + +from antispam.rate_limit import unblock_ip +from accounts.models import PendingUser +from accounts.models import IllegalUsername +from accounts.models import IllegalEmail + + +class RegistrationTest(TestCase): + + def setUp(self): + u = User.objects.create_user('existing_user', 'existing_user@example.com', 'pw') + u.save() + + # a 2nd user has the same email as another + u = User.objects.create_user('existing_user2', 'existing_user@example.com', 'pw') + u.save() + + PendingUser.objects.create(username='pending_user', + email='pending_user@example.com', + password='pw', + date_joined=datetime.datetime.now(), + key='key') + + IllegalUsername.objects.create(username='illegalusername') + IllegalEmail.objects.create(email='illegal@example.com') + + def tearDown(self): + unblock_ip('127.0.0.1') + + def test_get_view(self): + """ + Test a simple get of the registration view + + """ + response = self.client.get(reverse('accounts-register')) + self.assertEqual(response.status_code, 200) + + def test_existing_user(self): + """ + Ensure we can't register with an existing username. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'existing_user', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, 'A user with that username already exists') + + def test_pending_user(self): + """ + Ensure we can't register with a pending username. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'pending_user', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, 'A pending user with that username already exists') + + def test_illegal_username(self): + """ + Ensure we can't register with a banned username. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'illegalusername', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, 'That username is not allowed') + + def test_duplicate_existing_email(self): + """ + Ensure we can't register with a duplicate email address. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'existing_user@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, 'A user with that email address already exists') + + def test_duplicate_pending_email(self): + """ + Ensure we can't register with a duplicate email address. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'pending_user@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, 'A pending user with that email address already exists') + + def test_illegal_email(self): + """ + Ensure we can't register with a banned email address. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'illegal@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, 'That email address is not allowed') + + def test_password_match(self): + """ + Ensure the passwords match. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password_doesnt match', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "The two password fields didn't match") + + def test_question1(self): + """ + Ensure our anti-spam question is answered. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password_doesnt match', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': 'huh', + 'question2': '', + }) + + self.assertEqual(response.status_code, 200) + self.assertContains(response, "Incorrect answer to our anti-spam question") + + def test_question2(self): + """ + Ensure our honeypot question check works. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password_doesnt match', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': 'non blank', + }) + + self.assertEqual(response.status_code, 403) + + def test_success(self): + """ + Ensure we can successfully register. + + """ + response = self.client.post(reverse('accounts-register'), { + 'username': 'a_new_user', + 'email': 'test@example.com', + 'password1': 'my_password', + 'password2': 'my_password', + 'agree_age': 'on', + 'agree_tos': 'on', + 'agree_privacy': 'on', + 'question1': '101', + 'question2': '', + }) + + self.assertEqual(response.status_code, 302) + + try: + pending = PendingUser.objects.get(username='a_new_user') + except PendingUser.DoesNotExist: + self.fail("PendingUser was not created") + + self.assertEqual(pending.email, 'test@example.com') + self.assertTrue(datetime.datetime.now() - pending.date_joined < + datetime.timedelta(minutes=1)) + self.assertTrue(check_password('my_password', pending.password)) diff -r e5d3552d4ad0 -r 6a265b5768ca gpp/antispam/rate_limit.py --- a/gpp/antispam/rate_limit.py Mon Feb 13 19:34:29 2012 -0600 +++ b/gpp/antispam/rate_limit.py Sun Mar 04 13:20:40 2012 -0600 @@ -6,7 +6,6 @@ import logging import redis -from django.conf import settings from core.services import get_redis_connection @@ -71,6 +70,22 @@ logger.info("Rate limiter blocked IP %s; %d / %s", ip, count, interval) +def unblock_ip(ip): + """ + This function removes the block for the given IP address. + + """ + key = _make_key(ip) + conn = _get_connection() + try: + conn.delete(key) + except redis.RedisError, e: + logger.error("rate limit (unblock_ip): %s" % e) + raise RateLimiterUnavailable + + logger.info("Rate limiter unblocked IP %s", ip) + + class RateLimiter(object): """ This class encapsulates the rate limiting logic for a given IP address.