[snowy] [django_openid_auth] Update to version 84 from the bzr upstream
- From: Jeff Schroeder <jschroeder src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [snowy] [django_openid_auth] Update to version 84 from the bzr upstream
- Date: Mon, 10 Oct 2011 01:28:38 +0000 (UTC)
commit 898153eff8ac1a89e4b0bd34198f5c1f4552949e
Author: Jeff Schroeder <jeffschroeder computer org>
Date: Sun Oct 9 18:11:05 2011 -0700
[django_openid_auth] Update to version 84 from the bzr upstream
This will need leonh's changes from: 5e69e963fd7511df1f re-applied
lib/django_openid_auth/LICENSE.txt | 2 +-
lib/django_openid_auth/auth.py | 235 +++++---
lib/django_openid_auth/models.py | 2 +-
lib/django_openid_auth/tests/__init__.py | 7 +-
lib/django_openid_auth/tests/test_store.py | 2 +-
lib/django_openid_auth/tests/test_views.py | 895 +++++++++++++++++++++++++++-
lib/django_openid_auth/tests/urls.py | 2 +-
lib/django_openid_auth/views.py | 68 ++-
8 files changed, 1097 insertions(+), 116 deletions(-)
---
diff --git a/lib/django_openid_auth/LICENSE.txt b/lib/django_openid_auth/LICENSE.txt
index 0e67faf..3abea62 100644
--- a/lib/django_openid_auth/LICENSE.txt
+++ b/lib/django_openid_auth/LICENSE.txt
@@ -1,5 +1,5 @@
Copyright (C) 2007 Simon Willison
-Copyright (C) 2008-2009 Canonical Ltd.
+Copyright (C) 2008-2010 Canonical Ltd.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
diff --git a/lib/django_openid_auth/auth.py b/lib/django_openid_auth/auth.py
index 2fc3131..9e16e36 100644
--- a/lib/django_openid_auth/auth.py
+++ b/lib/django_openid_auth/auth.py
@@ -33,15 +33,17 @@ __metaclass__ = type
from django.conf import settings
from django.contrib.auth.models import User, Group
from openid.consumer.consumer import SUCCESS
-from openid.extensions import ax, sreg
+from openid.extensions import ax, sreg, pape
from django_openid_auth import teams
from django_openid_auth.models import UserOpenID
-
-
-class IdentityAlreadyClaimed(Exception):
- pass
-
+from django_openid_auth.exceptions import (
+ IdentityAlreadyClaimed,
+ DuplicateUsernameViolation,
+ MissingUsernameViolation,
+ MissingPhysicalMultiFactor,
+ RequiredAttributeNotReturned,
+)
class OpenIDBackend:
"""A django.contrib.auth backend that authenticates the user based on
@@ -71,13 +73,8 @@ class OpenIDBackend:
user_openid = UserOpenID.objects.get(
claimed_id__exact=openid_response.identity_url)
except UserOpenID.DoesNotExist:
- if kwargs.get('create_user') and kwargs.get('username'):
- user = self.create_user_from_openid(openid_response,
- username=kwargs.get('username'))
- elif getattr(settings, 'OPENID_CREATE_USERS', False):
+ if getattr(settings, 'OPENID_CREATE_USERS', False):
user = self.create_user_from_openid(openid_response)
- else:
- return None
else:
user = user_openid.user
@@ -85,42 +82,158 @@ class OpenIDBackend:
return None
if getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False):
- details = _extract_user_details(openid_response)
- self.update_user_details(user, details)
+ details = self._extract_user_details(openid_response)
+ self.update_user_details(user, details, openid_response)
+
+ if getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False):
+ pape_response = pape.Response.fromSuccessResponse(openid_response)
+ if pape_response is None or \
+ pape.AUTH_MULTI_FACTOR_PHYSICAL not in pape_response.auth_policies:
+ raise MissingPhysicalMultiFactor()
teams_response = teams.TeamsResponse.fromSuccessResponse(
openid_response)
if teams_response:
self.update_groups_from_teams(user, teams_response)
+ self.update_staff_status_from_teams(user, teams_response)
return user
- def create_user_from_openid(self, openid_response, username='snowyuser', email=''):
- nickname = username
- #email = details['email'] or ''
+ def _extract_user_details(self, openid_response):
+ email = fullname = first_name = last_name = nickname = None
+ sreg_response = sreg.SRegResponse.fromSuccessResponse(openid_response)
+ if sreg_response:
+ email = sreg_response.get('email')
+ fullname = sreg_response.get('fullname')
+ nickname = sreg_response.get('nickname')
+ # If any attributes are provided via Attribute Exchange, use
+ # them in preference.
+ fetch_response = ax.FetchResponse.fromSuccessResponse(openid_response)
+ if fetch_response:
+ # The myOpenID provider advertises AX support, but uses
+ # attribute names from an obsolete draft of the
+ # specification. We check for them first so the common
+ # names take precedence.
+ email = fetch_response.getSingle(
+ 'http://schema.openid.net/contact/email', email)
+ fullname = fetch_response.getSingle(
+ 'http://schema.openid.net/namePerson', fullname)
+ nickname = fetch_response.getSingle(
+ 'http://schema.openid.net/namePerson/friendly', nickname)
+
+ email = fetch_response.getSingle(
+ 'http://axschema.org/contact/email', email)
+ fullname = fetch_response.getSingle(
+ 'http://axschema.org/namePerson', fullname)
+ first_name = fetch_response.getSingle(
+ 'http://axschema.org/namePerson/first', first_name)
+ last_name = fetch_response.getSingle(
+ 'http://axschema.org/namePerson/last', last_name)
+ nickname = fetch_response.getSingle(
+ 'http://axschema.org/namePerson/friendly', nickname)
+
+ if fullname and not (first_name or last_name):
+ # Django wants to store first and last names separately,
+ # so we do our best to split the full name.
+ fullname = fullname.strip()
+ split_names = fullname.rsplit(None, 1)
+ if len(split_names) == 2:
+ first_name, last_name = split_names
+ else:
+ first_name = u''
+ last_name = fullname
+
+ return dict(email=email, nickname=nickname,
+ first_name=first_name, last_name=last_name)
+
+ def _get_available_username(self, nickname, identity_url):
+ # If we're being strict about usernames, throw an error if we didn't
+ # get one back from the provider
+ if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
+ if nickname is None or nickname == '':
+ raise MissingUsernameViolation()
+
+ # If we don't have a nickname, and we're not being strict, use a default
+ nickname = nickname or 'openiduser'
+
+ # See if we already have this nickname assigned to a username
+ try:
+ user = User.objects.get(username__exact=nickname)
+ except User.DoesNotExist:
+ # No conflict, we can use this nickname
+ return nickname
+
+ # Check if we already have nickname+i for this identity_url
+ try:
+ user_openid = UserOpenID.objects.get(
+ claimed_id__exact=identity_url,
+ user__username__startswith=nickname)
+ # No exception means we have an existing user for this identity
+ # that starts with this nickname.
+
+ # If they are an exact match, the user already exists and hasn't
+ # changed their username, so continue to use it
+ if nickname == user_openid.user.username:
+ return nickname
+
+ # It is possible we've had to assign them to nickname+i already.
+ oid_username = user_openid.user.username
+ if len(oid_username) > len(nickname):
+ try:
+ # check that it ends with a number
+ int(oid_username[len(nickname):])
+ return oid_username
+ except ValueError:
+ # username starts with nickname, but isn't nickname+#
+ pass
+ except UserOpenID.DoesNotExist:
+ # No user associated with this identity_url
+ pass
+
+
+ if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
+ if User.objects.filter(username__exact=nickname).count() > 0:
+ raise DuplicateUsernameViolation(
+ "The username (%s) with which you tried to log in is "
+ "already in use for a different account." % nickname)
# Pick a username for the user based on their nickname,
- # checking for conflicts.
- i = 1
+ # checking for conflicts. Start with number of existing users who's
+ # username starts with this nickname to avoid having to iterate over
+ # all of the existing ones.
+ i = User.objects.filter(username__startswith=nickname).count() + 1
while True:
username = nickname
if i > 1:
username += str(i)
try:
- User.objects.get(username__exact=username)
+ user = User.objects.get(username__exact=username)
except User.DoesNotExist:
break
i += 1
+ return username
- user = User.objects.create_user(username, email, password=None)
- user.get_profile().openid_user = True
- user.get_profile().save()
+ def create_user_from_openid(self, openid_response):
+ details = self._extract_user_details(openid_response)
+ required_attrs = getattr(settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
+ if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
+ required_attrs.append('nickname')
- if getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False):
- details = _extract_user_details(openid_response)
- self.update_user_details(user, details)
+ for required_attr in required_attrs:
+ if required_attr not in details or not details[required_attr]:
+ raise RequiredAttributeNotReturned(
+ "An attribute required for logging in was not "
+ "returned ({0}).".format(required_attr))
+ nickname = details['nickname'] or 'openiduser'
+ email = details['email'] or ''
+
+ username = self._get_available_username(details['nickname'], openid_response.identity_url)
+
+ user = User.objects.create_user(username, email, password=None)
self.associate_openid(user, openid_response)
+ self.update_user_details(user, details, openid_response)
+
return user
def associate_openid(self, user, openid_response):
@@ -143,17 +256,20 @@ class OpenIDBackend:
return user_openid
- def update_user_details(self, user, details):
+ def update_user_details(self, user, details, openid_response):
updated = False
if details['first_name']:
- user.first_name = details['first_name']
+ user.first_name = details['first_name'][:30]
updated = True
if details['last_name']:
- user.last_name = details['last_name']
+ user.last_name = details['last_name'][:30]
updated = True
if details['email']:
user.email = details['email']
updated = True
+ if getattr(settings, 'OPENID_FOLLOW_RENAMES', False):
+ user.username = self._get_available_username(details['nickname'], openid_response.identity_url)
+ updated = True
if updated:
user.save()
@@ -183,56 +299,17 @@ class OpenIDBackend:
for group in desired_groups - current_groups:
user.groups.add(group)
-# kept outside of the class to make function usable outside of the backend
-def _extract_user_details(openid_response):
- email = fullname = first_name = last_name = nickname = None
- sreg_response = sreg.SRegResponse.fromSuccessResponse(openid_response)
- if sreg_response:
- email = sreg_response.get('email')
- fullname = sreg_response.get('fullname')
- nickname = sreg_response.get('nickname')
-
- # If any attributes are provided via Attribute Exchange, use
- # them in preference.
- fetch_response = ax.FetchResponse.fromSuccessResponse(openid_response)
- if fetch_response:
- # The myOpenID provider advertises AX support, but uses
- # attribute names from an obsolete draft of the
- # specification. We check for them first so the common
- # names take precedence.
- email = fetch_response.getSingle(
- 'http://schema.openid.net/contact/email', email)
- fullname = fetch_response.getSingle(
- 'http://schema.openid.net/namePerson', fullname)
- nickname = fetch_response.getSingle(
- 'http://schema.openid.net/namePerson/friendly', nickname)
-
- email = fetch_response.getSingle(
- 'http://axschema.org/contact/email', email)
- fullname = fetch_response.getSingle(
- 'http://axschema.org/namePerson', fullname)
- first_name = fetch_response.getSingle(
- 'http://axschema.org/namePerson/first', first_name)
- last_name = fetch_response.getSingle(
- 'http://axschema.org/namePerson/last', last_name)
- nickname = fetch_response.getSingle(
- 'http://axschema.org/namePerson/friendly', nickname)
-
- if fullname and not (first_name or last_name):
- # Django wants to store first and last names separately,
- # so we do our best to split the full name.
- if ' ' in fullname:
- first_name, last_name = fullname.rsplit(None, 1)
- else:
- first_name = u''
- last_name = fullname
+ def update_staff_status_from_teams(self, user, teams_response):
+ if not hasattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS'):
+ return
+
+ staff_teams = getattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS', [])
+ user.is_staff = False
- if (first_name and last_name) and not fullname:
- fullname = last_name + " " + last_name
+ for lp_team in teams_response.is_member:
+ if lp_team in staff_teams:
+ user.is_staff = True
+ break
- if (first_name and last_name) and not nickname:
- nickname = (first_name + last_name).lower()
+ user.save()
- return dict(email=email, nickname=nickname,
- first_name=first_name, last_name=last_name,
- fullname=fullname)
diff --git a/lib/django_openid_auth/models.py b/lib/django_openid_auth/models.py
index 3638385..19cc871 100644
--- a/lib/django_openid_auth/models.py
+++ b/lib/django_openid_auth/models.py
@@ -54,5 +54,5 @@ class Association(models.Model):
class UserOpenID(models.Model):
user = models.ForeignKey(User)
- claimed_id = models.CharField(max_length=255, unique=True)
+ claimed_id = models.TextField(max_length=2047, unique=True)
display_id = models.TextField(max_length=2047)
diff --git a/lib/django_openid_auth/tests/__init__.py b/lib/django_openid_auth/tests/__init__.py
index a324e69..6a37269 100644
--- a/lib/django_openid_auth/tests/__init__.py
+++ b/lib/django_openid_auth/tests/__init__.py
@@ -1,6 +1,6 @@
# django-openid-auth - OpenID integration for django.contrib.auth
#
-# Copyright (C) 2009 Canonical Ltd.
+# Copyright (C) 2009-2010 Canonical Ltd.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
@@ -27,11 +27,14 @@
# POSSIBILITY OF SUCH DAMAGE.
import unittest
+from test_views import *
+from test_store import *
+from test_auth import *
def suite():
suite = unittest.TestSuite()
- for name in ['test_store', 'test_views']:
+ for name in ['test_auth', 'test_store', 'test_views']:
mod = __import__('%s.%s' % (__name__, name), {}, {}, ['suite'])
suite.addTest(mod.suite())
return suite
diff --git a/lib/django_openid_auth/tests/test_store.py b/lib/django_openid_auth/tests/test_store.py
index 588c8fa..6062502 100644
--- a/lib/django_openid_auth/tests/test_store.py
+++ b/lib/django_openid_auth/tests/test_store.py
@@ -1,6 +1,6 @@
# django-openid-auth - OpenID integration for django.contrib.auth
#
-# Copyright (C) 2009 Canonical Ltd.
+# Copyright (C) 2009-2010 Canonical Ltd.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
diff --git a/lib/django_openid_auth/tests/test_views.py b/lib/django_openid_auth/tests/test_views.py
index 26cd87c..67902c1 100644
--- a/lib/django_openid_auth/tests/test_views.py
+++ b/lib/django_openid_auth/tests/test_views.py
@@ -1,6 +1,6 @@
# django-openid-auth - OpenID integration for django.contrib.auth
#
-# Copyright (C) 2009 Canonical Ltd.
+# Copyright (C) 2009-2010 Canonical Ltd.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
@@ -27,28 +27,41 @@
# POSSIBILITY OF SUCH DAMAGE.
import cgi
-import re
-import time
import unittest
+from urllib import quote_plus
from django.conf import settings
from django.contrib.auth.models import User, Group
+from django.http import HttpRequest, HttpResponse
from django.test import TestCase
-from openid.extensions import ax, sreg
+from openid.consumer.consumer import Consumer, SuccessResponse
+from openid.consumer.discover import OpenIDServiceEndpoint
+from openid.extensions import ax, sreg, pape
from openid.fetchers import (
HTTPFetcher, HTTPFetchingError, HTTPResponse, setDefaultFetcher)
from openid.oidutil import importElementTree
-from openid.server.server import BROWSER_REQUEST_MODES, Server
+from openid.server.server import BROWSER_REQUEST_MODES, ENCODE_URL, Server
from openid.store.memstore import MemoryStore
+from openid.message import OPENID1_URL_LIMIT, IDENTIFIER_SELECT
from django_openid_auth import teams
from django_openid_auth.models import UserOpenID
-from django_openid_auth.views import sanitise_redirect_url
-
+from django_openid_auth.views import (
+ sanitise_redirect_url,
+ make_consumer,
+)
+from django_openid_auth.auth import OpenIDBackend
+from django_openid_auth.signals import openid_login_complete
+from django_openid_auth.store import DjangoOpenIDStore
+from django_openid_auth.exceptions import (
+ MissingUsernameViolation,
+ DuplicateUsernameViolation,
+ MissingPhysicalMultiFactor,
+ RequiredAttributeNotReturned,
+)
ET = importElementTree()
-
class StubOpenIDProvider(HTTPFetcher):
def __init__(self, base_url):
@@ -120,40 +133,94 @@ class StubOpenIDProvider(HTTPFetcher):
return self.last_request
+class DummyDjangoRequest(object):
+ def __init__(self, request_path):
+ self.request_path = request_path
+ self.META = {
+ 'HTTP_HOST': "localhost",
+ 'SCRIPT_NAME': "http://localhost",
+ 'SERVER_PROTOCOL': "http",
+ }
+ self.POST = {
+ 'openid_identifier': "http://example.com/identity",
+ }
+ self.GET = {}
+ self.session = {}
+
+ def get_full_path(self):
+ return self.META['SCRIPT_NAME'] + self.request_path
+
+ def build_absolute_uri(self):
+ return self.META['SCRIPT_NAME'] + self.request_path
+
+ def _combined_request(self):
+ request = {}
+ request.update(self.POST)
+ request.update(self.GET)
+ return request
+ REQUEST = property(_combined_request)
+
class RelyingPartyTests(TestCase):
urls = 'django_openid_auth.tests.urls'
def setUp(self):
super(RelyingPartyTests, self).setUp()
self.provider = StubOpenIDProvider('http://example.com/')
+ self.req = DummyDjangoRequest('http://localhost/')
+ self.endpoint = OpenIDServiceEndpoint()
+ self.endpoint.claimed_id = 'http://example.com/identity'
+ self.endpoint.server_url = 'http://example.com/'
+ self.consumer = make_consumer(self.req)
+ self.server = Server(DjangoOpenIDStore())
setDefaultFetcher(self.provider, wrap_exceptions=False)
self.old_login_redirect_url = getattr(settings, 'LOGIN_REDIRECT_URL', '/accounts/profile/')
self.old_create_users = getattr(settings, 'OPENID_CREATE_USERS', False)
+ self.old_strict_usernames = getattr(settings, 'OPENID_STRICT_USERNAMES', False)
self.old_update_details = getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False)
- self.old_sso_server_url = getattr(settings, 'OPENID_SSO_SERVER_URL')
+ self.old_sso_server_url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
self.old_teams_map = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
self.old_use_as_admin_login = getattr(settings, 'OPENID_USE_AS_ADMIN_LOGIN', False)
+ self.old_follow_renames = getattr(settings, 'OPENID_FOLLOW_RENAMES', False)
+ self.old_physical_multifactor = getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False)
+ self.old_login_render_failure = getattr(settings, 'OPENID_RENDER_FAILURE', None)
+ self.old_consumer_complete = Consumer.complete
+
+ self.old_required_fields = getattr(
+ settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
settings.OPENID_CREATE_USERS = False
+ settings.OPENID_STRICT_USERNAMES = False
settings.OPENID_UPDATE_DETAILS_FROM_SREG = False
settings.OPENID_SSO_SERVER_URL = None
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {}
settings.OPENID_USE_AS_ADMIN_LOGIN = False
+ settings.OPENID_FOLLOW_RENAMES = False
+ settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = False
+ settings.OPENID_SREG_REQUIRED_FIELDS = []
def tearDown(self):
settings.LOGIN_REDIRECT_URL = self.old_login_redirect_url
settings.OPENID_CREATE_USERS = self.old_create_users
+ settings.OPENID_STRICT_USERNAMES = self.old_strict_usernames
settings.OPENID_UPDATE_DETAILS_FROM_SREG = self.old_update_details
settings.OPENID_SSO_SERVER_URL = self.old_sso_server_url
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = self.old_teams_map
settings.OPENID_USE_AS_ADMIN_LOGIN = self.old_use_as_admin_login
+ settings.OPENID_FOLLOW_RENAMES = self.old_follow_renames
+ settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = self.old_physical_multifactor
+ settings.OPENID_RENDER_FAILURE = self.old_login_render_failure
+ Consumer.complete = self.old_consumer_complete
+ settings.OPENID_SREG_REQUIRED_FIELDS = self.old_required_fields
setDefaultFetcher(None)
super(RelyingPartyTests, self).tearDown()
def complete(self, openid_response):
"""Complete an OpenID authentication request."""
+ # The server can generate either a redirect or a form post
+ # here. For simplicity, force generation of a redirect.
+ openid_response.whichEncoding = lambda: ENCODE_URL
webresponse = self.provider.server.encodeResponse(openid_response)
self.assertEquals(webresponse.code, 302)
redirect_to = webresponse.headers['location']
@@ -282,8 +349,349 @@ class RelyingPartyTests(TestCase):
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo example com')
- def test_login_update_details(self):
+ def _do_user_login(self, req_data, resp_data, use_sreg=True, use_pape=None):
+ openid_request = self._get_login_request(req_data)
+ openid_response = self._get_login_response(openid_request, resp_data, use_sreg, use_pape)
+ response = self.complete(openid_response)
+ self.assertRedirects(response, 'http://testserver/getuser/')
+ return response
+
+ def _get_login_request(self, req_data):
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/', req_data)
+ self.assertContains(response, 'OpenID transaction in progress')
+
+ # Complete the request, passing back some simple registration
+ # data. The user is redirected to the next URL.
+ openid_request = self.provider.parseFormPost(response.content)
+ return openid_request
+
+ def _get_login_response(self, openid_request, resp_data, use_sreg, use_pape):
+ openid_response = openid_request.answer(True)
+
+ if use_sreg:
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+ sreg_response = sreg.SRegResponse.extractResponse(
+ sreg_request, resp_data)
+ openid_response.addExtension(sreg_response)
+ if use_pape is not None:
+ policies = [
+ use_pape
+ ]
+ pape_response = pape.Response(auth_policies=policies)
+ openid_response.addExtension(pape_response)
+ return openid_response
+
+ def parse_query_string(self, query_str):
+ query_items = map(tuple,
+ [item.split('=') for item in query_str.split('&')])
+ query = dict(query_items)
+ return query
+
+ def test_login_physical_multifactor_request(self):
+ settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
+ preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
+ self.provider.type_uris.append(pape.ns_uri)
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ response = self.client.post('/openid/login/', openid_req)
+ openid_request = self.provider.parseFormPost(response.content)
+
+ request_auth = openid_request.message.getArg(
+ 'http://specs.openid.net/extensions/pape/1.0',
+ 'preferred_auth_policies',
+ )
+ self.assertEqual(request_auth, preferred_auth)
+
+ def test_login_physical_multifactor_response(self):
+ settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
+ preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
+ self.provider.type_uris.append(pape.ns_uri)
+
+ def mock_complete(this, request_args, return_to):
+ request = {'openid.mode': 'checkid_setup',
+ 'openid.trust_root': 'http://localhost/',
+ 'openid.return_to': 'http://localhost/',
+ 'openid.identity': IDENTIFIER_SELECT,
+ 'openid.ns.pape' : pape.ns_uri,
+ 'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
+ }
+ openid_server = self.provider.server
+ orequest = openid_server.decodeRequest(request)
+ response = SuccessResponse(
+ self.endpoint, orequest.message,
+ signed_fields=['openid.pape.auth_policies',])
+ return response
+ Consumer.complete = mock_complete
+
+ user = User.objects.create_user('testuser', 'test example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
+ 'email': 'test example com'}
+
+ response = self._do_user_login(openid_req, openid_resp, use_pape=pape.AUTH_MULTI_FACTOR_PHYSICAL)
+
+ query = self.parse_query_string(response.request['QUERY_STRING'])
+ self.assertTrue('openid.pape.auth_policies' in query)
+ self.assertEqual(query['openid.pape.auth_policies'],
+ quote_plus(preferred_auth))
+
+ response = self.client.get('/getuser/')
+ self.assertEqual(response.content, 'testuser')
+
+
+ def test_login_physical_multifactor_not_provided(self):
+ settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
+ preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
+ self.provider.type_uris.append(pape.ns_uri)
+
+ def mock_complete(this, request_args, return_to):
+ request = {'openid.mode': 'checkid_setup',
+ 'openid.trust_root': 'http://localhost/',
+ 'openid.return_to': 'http://localhost/',
+ 'openid.identity': IDENTIFIER_SELECT,
+ 'openid.ns.pape' : pape.ns_uri,
+ 'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
+ }
+ openid_server = self.provider.server
+ orequest = openid_server.decodeRequest(request)
+ response = SuccessResponse(
+ self.endpoint, orequest.message,
+ signed_fields=['openid.pape.auth_policies',])
+ return response
+ Consumer.complete = mock_complete
+
+ user = User.objects.create_user('testuser', 'test example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
+ 'email': 'test example com'}
+
+ openid_request = self._get_login_request(openid_req)
+ openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
+
+ response_auth = openid_request.message.getArg(
+ 'http://specs.openid.net/extensions/pape/1.0',
+ 'auth_policies',
+ )
+ self.assertNotEqual(response_auth, preferred_auth)
+
+ response = self.complete(openid_response)
+ self.assertEquals(403, response.status_code)
+ self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
+ self.assertContains(response, '<p>Login requires physical multi-factor authentication.</p>', status_code=403)
+
+ def test_login_physical_multifactor_not_provided_override(self):
+ settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
+ preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
+ self.provider.type_uris.append(pape.ns_uri)
+
+ # Override the login_failure handler
+ def mock_login_failure_handler(request, message, status=403,
+ template_name=None,
+ exception=None):
+ self.assertTrue(isinstance(exception, MissingPhysicalMultiFactor))
+ return HttpResponse('Test Failure Override', status=200)
+ settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
+
+ def mock_complete(this, request_args, return_to):
+ request = {'openid.mode': 'checkid_setup',
+ 'openid.trust_root': 'http://localhost/',
+ 'openid.return_to': 'http://localhost/',
+ 'openid.identity': IDENTIFIER_SELECT,
+ 'openid.ns.pape' : pape.ns_uri,
+ 'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
+ }
+ openid_server = self.provider.server
+ orequest = openid_server.decodeRequest(request)
+ response = SuccessResponse(
+ self.endpoint, orequest.message,
+ signed_fields=['openid.pape.auth_policies',])
+ return response
+ Consumer.complete = mock_complete
+
+ user = User.objects.create_user('testuser', 'test example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
+ 'email': 'test example com'}
+
+ openid_request = self._get_login_request(openid_req)
+ openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
+
+ response_auth = openid_request.message.getArg(
+ 'http://specs.openid.net/extensions/pape/1.0',
+ 'auth_policies',
+ )
+ self.assertNotEqual(response_auth, preferred_auth)
+
+ # Status code should be 200, since we over-rode the login_failure handler
+ response = self.complete(openid_response)
+ self.assertEquals(200, response.status_code)
+ self.assertContains(response, 'Test Failure Override')
+
+ def test_login_without_nickname(self):
+ settings.OPENID_CREATE_USERS = True
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': '', 'fullname': 'Openid User',
+ 'email': 'foo example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # username defaults to 'openiduser'
+ self.assertEquals(response.content, 'openiduser')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Openid')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'foo example com')
+
+ def test_login_duplicate_username_numbering(self):
+ settings.OPENID_FOLLOW_RENAMES = False
+ settings.OPENID_CREATE_USERS = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to conflict with
+ user = User.objects.create_user('testuser', 'someone example com')
+
+ # identity url is for 'renameuser'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
+ 'email': 'test example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it, and get testuser2.
+ self.assertEquals(response.content, 'testuser2')
+
+ def test_login_duplicate_username_numbering_with_conflicts(self):
+ settings.OPENID_FOLLOW_RENAMES = False
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to conflict with
+ user = User.objects.create_user('testuser', 'someone example com')
+ user = User.objects.create_user('testuser3', 'someone example com')
+
+ # identity url is for 'renameuser'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
+ 'email': 'test example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it starting with the count of users with
+ # username starting with 'testuser', of which there are 2. i should
+ # start at 3, which already exists, so it should skip to 4.
+ self.assertEquals(response.content, 'testuser4')
+
+ def test_login_duplicate_username_numbering_with_holes(self):
+ settings.OPENID_FOLLOW_RENAMES = False
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to conflict with
+ user = User.objects.create_user('testuser', 'someone example com')
+ user = User.objects.create_user('testuser1', 'someone example com')
+ user = User.objects.create_user('testuser6', 'someone example com')
+ user = User.objects.create_user('testuser7', 'someone example com')
+ user = User.objects.create_user('testuser8', 'someone example com')
+
+ # identity url is for 'renameuser'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
+ 'email': 'test example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it starting with the count of users with
+ # username starting with 'testuser', of which there are 5. i should
+ # start at 6, and increment until it reaches 9.
+ self.assertEquals(response.content, 'testuser9')
+
+ def test_login_duplicate_username_numbering_with_nonsequential_matches(self):
+ settings.OPENID_FOLLOW_RENAMES = False
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to conflict with
+ user = User.objects.create_user('testuser', 'someone example com')
+ user = User.objects.create_user('testuserfoo', 'someone example com')
+
+ # identity url is for 'renameuser'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
+ 'email': 'test example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # Since this username is already taken by someone else, we go through
+ # the process of adding +i to it starting with the count of users with
+ # username starting with 'testuser', of which there are 2. i should
+ # start at 3, which will be available.
+ self.assertEquals(response.content, 'testuser3')
+
+ def test_login_follow_rename(self):
+ settings.OPENID_FOLLOW_RENAMES = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ user = User.objects.create_user('testuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': 'someuser', 'fullname': 'Some User',
+ 'email': 'foo example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # If OPENID_FOLLOW_RENAMES, they are logged in as
+ # someuser (the passed in nickname has changed the username)
+ self.assertEquals(response.content, 'someuser')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Some')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'foo example com')
+
+ def test_login_follow_rename_without_nickname_change(self):
+ settings.OPENID_FOLLOW_RENAMES = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ settings.OPENID_STRICT_USERNAMES = True
user = User.objects.create_user('testuser', 'someone example com')
useropenid = UserOpenID(
user=user,
@@ -291,6 +699,242 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/identity')
useropenid.save()
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Some User',
+ 'email': 'foo example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # Username should not have changed
+ self.assertEquals(response.content, 'testuser')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Some')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'foo example com')
+
+ def test_login_follow_rename_conflict(self):
+ settings.OPENID_FOLLOW_RENAMES = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to switch to
+ user = User.objects.create_user('testuser', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=user,
+ claimed_id='http://example.com/existing_identity',
+ display_id='http://example.com/existing_identity')
+
+ # Setup user who is going to try to change username to 'testuser'
+ renamed_user = User.objects.create_user('renameuser', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=renamed_user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+
+ # identity url is for 'renameuser'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Rename User',
+ 'email': 'rename example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
+ # but since that username is already taken by someone else, we go through
+ # the process of adding +i to it, and get testuser2.
+ self.assertEquals(response.content, 'testuser2')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Rename')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'rename example com')
+
+ def test_login_follow_rename_false_onlyonce(self):
+ settings.OPENID_FOLLOW_RENAMES = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to switch to
+ user = User.objects.create_user('testuser', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=user,
+ claimed_id='http://example.com/existing_identity',
+ display_id='http://example.com/existing_identity')
+
+ # Setup user who is going to try to change username to 'testuser'
+ renamed_user = User.objects.create_user('testuser2000eight', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=renamed_user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+
+ # identity url is for 'testuser2000eight'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser2', 'fullname': 'Rename User',
+ 'email': 'rename example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
+ # but since that username is already taken by someone else, we go through
+ # the process of adding +i to it. Even though it looks like the username
+ # follows the nickname+i scheme, it has non-numbers in the suffix, so
+ # it's not an auto-generated one. The regular process of renaming to
+ # 'testuser' has a conflict, so we get +2 at the end.
+ self.assertEquals(response.content, 'testuser2')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Rename')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'rename example com')
+
+ def test_login_follow_rename_conflict_onlyonce(self):
+ settings.OPENID_FOLLOW_RENAMES = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's name we're going to switch to
+ user = User.objects.create_user('testuser', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=user,
+ claimed_id='http://example.com/existing_identity',
+ display_id='http://example.com/existing_identity')
+
+ # Setup user who is going to try to change username to 'testuser'
+ renamed_user = User.objects.create_user('testuser2000', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=renamed_user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+
+ # identity url is for 'testuser2000'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which already exists for another identity
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Rename User',
+ 'email': 'rename example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
+ # but since that username is already taken by someone else, we go through
+ # the process of adding +i to it. Since the user for this identity url
+ # already has a name matching that pattern, check if first.
+ self.assertEquals(response.content, 'testuser2000')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Rename')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'rename example com')
+
+ def test_login_follow_rename_false_conflict(self):
+ settings.OPENID_FOLLOW_RENAMES = True
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ # Setup existing user who's username matches the name+i pattern
+ user = User.objects.create_user('testuser2', 'someone example com')
+ UserOpenID.objects.get_or_create(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+
+ # identity url is for 'testuser2'
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ # but returned username is for 'testuser', which looks like we've done
+ # a username+1 for them already, but 'testuser' isn't actually taken
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Same User',
+ 'email': 'same example com'}
+ self._do_user_login(openid_req, openid_resp)
+ response = self.client.get('/getuser/')
+
+ # If OPENID_FOLLOW_RENAMES, username should be changed to 'testuser'
+ # because it wasn't currently taken
+ self.assertEquals(response.content, 'testuser')
+
+ # The user's full name and email have been updated.
+ user = User.objects.get(username=response.content)
+ self.assertEquals(user.first_name, 'Same')
+ self.assertEquals(user.last_name, 'User')
+ self.assertEquals(user.email, 'same example com')
+
+ def test_strict_username_no_nickname(self):
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_STRICT_USERNAMES = True
+ settings.OPENID_SREG_REQUIRED_FIELDS = []
+
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'})
+ self.assertContains(response, 'OpenID transaction in progress')
+
+ # Complete the request, passing back some simple registration
+ # data. The user is redirected to the next URL.
+ openid_request = self.provider.parseFormPost(response.content)
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+ openid_response = openid_request.answer(True)
+ sreg_response = sreg.SRegResponse.extractResponse(
+ sreg_request, {'nickname': '', # No nickname
+ 'fullname': 'Some User',
+ 'email': 'foo example com'})
+ openid_response.addExtension(sreg_response)
+ response = self.complete(openid_response)
+
+ # Status code should be 403: Forbidden
+ self.assertEquals(403, response.status_code)
+ self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
+ self.assertContains(response, "An attribute required for logging in was not returned "
+ "(nickname)", status_code=403)
+
+ def test_strict_username_no_nickname_override(self):
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_STRICT_USERNAMES = True
+ settings.OPENID_SREG_REQUIRED_FIELDS = []
+
+ # Override the login_failure handler
+ def mock_login_failure_handler(request, message, status=403,
+ template_name=None,
+ exception=None):
+ self.assertTrue(isinstance(exception, (RequiredAttributeNotReturned, MissingUsernameViolation)))
+ return HttpResponse('Test Failure Override', status=200)
+ settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
+
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'})
+ self.assertContains(response, 'OpenID transaction in progress')
+
+ # Complete the request, passing back some simple registration
+ # data. The user is redirected to the next URL.
+ openid_request = self.provider.parseFormPost(response.content)
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+ openid_response = openid_request.answer(True)
+ sreg_response = sreg.SRegResponse.extractResponse(
+ sreg_request, {'nickname': '', # No nickname
+ 'fullname': 'Some User',
+ 'email': 'foo example com'})
+ openid_response.addExtension(sreg_response)
+ response = self.complete(openid_response)
+
+ # Status code should be 200, since we over-rode the login_failure handler
+ self.assertEquals(200, response.status_code)
+ self.assertContains(response, 'Test Failure Override')
+
+ def test_strict_username_duplicate_user(self):
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_STRICT_USERNAMES = True
+ # Create a user with the same name as we'll pass back via sreg.
+ user = User.objects.create_user('someuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/different_identity',
+ display_id='http://example.com/different_identity')
+ useropenid.save()
+
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
@@ -307,19 +951,150 @@ class RelyingPartyTests(TestCase):
'email': 'foo example com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
- self.assertRedirects(response, 'http://testserver/getuser/')
- # And they are now logged in as testuser (the passed in
- # nickname has not caused the username to change).
+ # Status code should be 403: Forbidden
+ self.assertEquals(403, response.status_code)
+ self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
+ self.assertContains(response,
+ "The username (someuser) with which you tried to log in is "
+ "already in use for a different account.",
+ status_code=403)
+
+ def test_strict_username_duplicate_user_override(self):
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_STRICT_USERNAMES = True
+
+ # Override the login_failure handler
+ def mock_login_failure_handler(request, message, status=403,
+ template_name=None,
+ exception=None):
+ self.assertTrue(isinstance(exception, DuplicateUsernameViolation))
+ return HttpResponse('Test Failure Override', status=200)
+ settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
+
+ # Create a user with the same name as we'll pass back via sreg.
+ user = User.objects.create_user('someuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/different_identity',
+ display_id='http://example.com/different_identity')
+ useropenid.save()
+
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'})
+ self.assertContains(response, 'OpenID transaction in progress')
+
+ # Complete the request, passing back some simple registration
+ # data. The user is redirected to the next URL.
+ openid_request = self.provider.parseFormPost(response.content)
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+ openid_response = openid_request.answer(True)
+ sreg_response = sreg.SRegResponse.extractResponse(
+ sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
+ 'email': 'foo example com'})
+ openid_response.addExtension(sreg_response)
+ response = self.complete(openid_response)
+
+ # Status code should be 200, since we over-rode the login_failure handler
+ self.assertEquals(200, response.status_code)
+ self.assertContains(response, 'Test Failure Override')
+
+ def test_login_requires_sreg_required_fields(self):
+ # If any required attributes are not included in the response,
+ # we fail with a forbidden.
+ settings.OPENID_CREATE_USERS = True
+ settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'})
+ self.assertContains(response, 'OpenID transaction in progress')
+
+ # Complete the request, passing back some simple registration
+ # data. The user is redirected to the next URL.
+ openid_request = self.provider.parseFormPost(response.content)
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+ openid_response = openid_request.answer(True)
+ sreg_response = sreg.SRegResponse.extractResponse(
+ sreg_request, {'nickname': 'foo',
+ 'fullname': 'Some User',
+ 'email': 'foo example com'})
+ openid_response.addExtension(sreg_response)
+ response = self.complete(openid_response)
+
+ # Status code should be 403: Forbidden as we didn't include
+ # a required field - language.
+ self.assertContains(response,
+ "An attribute required for logging in was not returned "
+ "(language)", status_code=403)
+
+ def test_login_update_details(self):
+ settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
+ user = User.objects.create_user('testuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ openid_req = {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'}
+ openid_resp = {'nickname': 'testuser', 'fullname': 'Some User',
+ 'email': 'foo example com'}
+ self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
+
self.assertEquals(response.content, 'testuser')
# The user's full name and email have been updated.
- user = User.objects.get(username='testuser')
+ user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Some')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo example com')
+ def test_login_uses_sreg_extra_fields(self):
+ # The configurable sreg attributes are used in the request.
+ settings.OPENID_SREG_EXTRA_FIELDS = ('language',)
+ user = User.objects.create_user('testuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'})
+
+ openid_request = self.provider.parseFormPost(response.content)
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+ for field in ('email', 'fullname', 'nickname', 'language'):
+ self.assertTrue(field in sreg_request)
+
+ def test_login_uses_sreg_required_fields(self):
+ # The configurable sreg attributes are used in the request.
+ settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
+ user = User.objects.create_user('testuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity',
+ 'next': '/getuser/'})
+
+ openid_request = self.provider.parseFormPost(response.content)
+ sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
+
+ self.assertEqual(['email', 'language'], sreg_request.required)
+ self.assertEqual(['fullname', 'nickname'], sreg_request.optional)
+
def test_login_attribute_exchange(self):
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
user = User.objects.create_user('testuser', 'someone example com')
@@ -355,6 +1130,13 @@ class RelyingPartyTests(TestCase):
'http://axschema.org/namePerson/last'))
self.assertTrue(fetch_request.has_key(
'http://axschema.org/namePerson/friendly'))
+ # myOpenID compatibilty attributes:
+ self.assertTrue(fetch_request.has_key(
+ 'http://schema.openid.net/contact/email'))
+ self.assertTrue(fetch_request.has_key(
+ 'http://schema.openid.net/namePerson'))
+ self.assertTrue(fetch_request.has_key(
+ 'http://schema.openid.net/namePerson/friendly'))
# Build up a response including AX data.
openid_response = openid_request.answer(True)
@@ -383,6 +1165,7 @@ class RelyingPartyTests(TestCase):
self.assertEquals(user.email, 'foo example com')
def test_login_teams(self):
+ settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = False
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
'otherteam': 'othergroup'}
user = User.objects.create_user('testuser', 'someone example com')
@@ -452,11 +1235,91 @@ class RelyingPartyTests(TestCase):
openid_request = self.provider.parseFormPost(response.content)
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
-
+
self.assertEqual(group1 in user.groups.all(), False)
self.assertEqual(group2 in user.groups.all(), False)
self.assertTrue(group3 not in user.groups.all())
+ def test_login_teams_staff_not_defined(self):
+ delattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS')
+ user = User.objects.create_user('testuser', 'someone example com')
+ user.is_staff = True
+ user.save()
+ self.assertTrue(user.is_staff)
+
+ user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
+ self.assertTrue(user.is_staff)
+
+ def test_login_teams_staff_assignment(self):
+ settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('teamname',)
+ user = User.objects.create_user('testuser', 'someone example com')
+ user.is_staff = False
+ user.save()
+ self.assertFalse(user.is_staff)
+
+ user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
+ self.assertTrue(user.is_staff)
+
+ def test_login_teams_staff_unassignment(self):
+ settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('different-teamname',)
+ user = User.objects.create_user('testuser', 'someone example com')
+ user.is_staff = True
+ user.save()
+ self.assertTrue(user.is_staff)
+
+ user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
+ self.assertFalse(user.is_staff)
+
+ def get_openid_authed_user_with_teams(self, user, teams_str):
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+
+ # Posting in an identity URL begins the authentication request:
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity'})
+
+ # Complete the request
+ openid_request = self.provider.parseFormPost(response.content)
+ openid_response = openid_request.answer(True)
+ teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
+ teams_response = teams.TeamsResponse.extractResponse(
+ teams_request, teams_str)
+ openid_response.addExtension(teams_response)
+ response = self.complete(openid_response)
+ return User.objects.get(username=user.username)
+
+ def test_login_complete_signals_login(self):
+ # An oauth_login_complete signal is emitted including the
+ # request and sreg_response.
+ user = User.objects.create_user('someuser', 'someone example com')
+ useropenid = UserOpenID(
+ user=user,
+ claimed_id='http://example.com/identity',
+ display_id='http://example.com/identity')
+ useropenid.save()
+ response = self.client.post('/openid/login/',
+ {'openid_identifier': 'http://example.com/identity'})
+ openid_request = self.provider.parseFormPost(response.content)
+ openid_response = openid_request.answer(True)
+ # Use a closure to test whether the signal handler was called.
+ self.signal_handler_called = False
+ def login_callback(sender, **kwargs):
+ self.assertTrue(isinstance(
+ kwargs.get('request', None), HttpRequest))
+ self.assertTrue(isinstance(
+ kwargs.get('openid_response', None), SuccessResponse))
+ self.signal_handler_called = True
+ openid_login_complete.connect(login_callback)
+
+ response = self.complete(openid_response)
+
+ self.assertTrue(self.signal_handler_called)
+ openid_login_complete.disconnect(login_callback)
+
+
class HelperFunctionsTest(TestCase):
def test_sanitise_redirect_url(self):
settings.ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS = [
@@ -482,6 +1345,6 @@ class HelperFunctionsTest(TestCase):
self.assertEqual(url, sanitised)
else:
self.assertEqual(settings.LOGIN_REDIRECT_URL, sanitised)
-
+
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff --git a/lib/django_openid_auth/tests/urls.py b/lib/django_openid_auth/tests/urls.py
index f241550..9403be5 100644
--- a/lib/django_openid_auth/tests/urls.py
+++ b/lib/django_openid_auth/tests/urls.py
@@ -1,6 +1,6 @@
# django-openid-auth - OpenID integration for django.contrib.auth
#
-# Copyright (C) 2009 Canonical Ltd.
+# Copyright (C) 2009-2010 Canonical Ltd.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
diff --git a/lib/django_openid_auth/views.py b/lib/django_openid_auth/views.py
index 90e39ee..a70c282 100644
--- a/lib/django_openid_auth/views.py
+++ b/lib/django_openid_auth/views.py
@@ -40,16 +40,25 @@ from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.template.loader import render_to_string
-from django.views.decorators.csrf import csrf_exempt
+try:
+ from django.views.decorators.csrf import csrf_exempt
+except ImportError:
+ from django.contrib.csrf.middleware import csrf_exempt
from openid.consumer.consumer import (
Consumer, SUCCESS, CANCEL, FAILURE)
from openid.consumer.discover import DiscoveryFailure
-from openid.extensions import sreg, ax
+from openid.extensions import sreg, ax, pape
from django_openid_auth import teams
from django_openid_auth.forms import OpenIDLoginForm
+from django_openid_auth.models import UserOpenID
+from django_openid_auth.signals import openid_login_complete
from django_openid_auth.store import DjangoOpenIDStore
+from django_openid_auth.exceptions import (
+ RequiredAttributeNotReturned,
+ DjangoOpenIDException,
+)
next_url_re = re.compile('^/[-\w/]+$')
@@ -57,7 +66,7 @@ next_url_re = re.compile('^/[-\w/]+$')
def is_valid_next_url(next):
# When we allow this:
# /openid/?next=/welcome/
- # For security reasons we want to restrict the next= bit to being a local
+ # For security reasons we want to restrict the next= bit to being a local
# path, not a complete URL.
return bool(next_url_re.match(next))
@@ -70,7 +79,7 @@ def sanitise_redirect_url(redirect_to):
is_valid = False
elif '//' in redirect_to:
# Allow the redirect URL to be external if it's a permitted domain
- allowed_domains = getattr(settings,
+ allowed_domains = getattr(settings,
"ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS", [])
s, netloc, p, q, f = urlsplit(redirect_to)
# allow it if netloc is blank or if the domain is allowed
@@ -113,10 +122,11 @@ def render_openid_request(request, openid_request, return_to, trust_root=None):
def default_render_failure(request, message, status=403,
- template_name='openid/failure.html'):
+ template_name='openid/failure.html',
+ exception=None):
"""Render an error page to the user."""
data = render_to_string(
- template_name, dict(message=message),
+ template_name, dict(message=message, exception=exception),
context_instance=RequestContext(request))
return HttpResponse(data, status=status)
@@ -166,7 +176,8 @@ def login_begin(request, template_name='openid/login.html',
openid_request = consumer.begin(openid_url)
except DiscoveryFailure, exc:
return render_failure(
- request, "OpenID discovery error: %s" % (str(exc),), status=500)
+ request, "OpenID discovery error: %s" % (str(exc),), status=500,
+ exception=exc)
# Request some user details. If the provider advertises support
# for attribute exchange, use that.
@@ -191,8 +202,25 @@ def login_begin(request, template_name='openid/login.html',
fetch_request.add(ax.AttrInfo(attr, alias=alias, required=True))
openid_request.addExtension(fetch_request)
else:
+ sreg_required_fields = []
+ sreg_required_fields.extend(
+ getattr(settings, 'OPENID_SREG_REQUIRED_FIELDS', []))
+ sreg_optional_fields = ['email', 'fullname', 'nickname']
+ sreg_optional_fields.extend(
+ getattr(settings, 'OPENID_SREG_EXTRA_FIELDS', []))
+ sreg_optional_fields = [
+ field for field in sreg_optional_fields if (
+ not field in sreg_required_fields)]
openid_request.addExtension(
- sreg.SRegRequest(optional=['email', 'fullname', 'nickname']))
+ sreg.SRegRequest(optional=sreg_optional_fields,
+ required=sreg_required_fields))
+
+ if getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False):
+ preferred_auth = [
+ pape.AUTH_MULTI_FACTOR_PHYSICAL,
+ ]
+ pape_request = pape.Request(preferred_auth_policies=preferred_auth)
+ openid_request.addExtension(pape_request)
# Request team info
teams_mapping_auto = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO', False)
@@ -223,8 +251,11 @@ def login_begin(request, template_name='openid/login.html',
@csrf_exempt
def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
- render_failure=default_render_failure):
+ render_failure=None):
redirect_to = request.REQUEST.get(redirect_field_name, '')
+ render_failure = render_failure or \
+ getattr(settings, 'OPENID_RENDER_FAILURE', None) or \
+ default_render_failure
openid_response = parse_openid_response(request)
if not openid_response:
@@ -232,18 +263,25 @@ def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
request, 'This is an OpenID relying party endpoint.')
if openid_response.status == SUCCESS:
- user = authenticate(openid_response=openid_response)
+ try:
+ user = authenticate(openid_response=openid_response)
+ except DjangoOpenIDException, e:
+ return render_failure(request, e.message, exception=e)
+
if user is not None:
if user.is_active:
auth_login(request, user)
- return HttpResponseRedirect(sanitise_redirect_url(redirect_to))
+ response = HttpResponseRedirect(sanitise_redirect_url(redirect_to))
+
+ # Notify any listeners that we successfully logged in.
+ openid_login_complete.send(sender=UserOpenID, request=request,
+ openid_response=openid_response)
+
+ return response
else:
return render_failure(request, 'Disabled account')
else:
- # save openid reponse in the session to create the user later
- request.session['openid_response'] = openid_response
- return HttpResponseRedirect(reverse('openid_registration'))
- #return render_failure(request, 'Unknown user')
+ return render_failure(request, 'Unknown user')
elif openid_response.status == FAILURE:
return render_failure(
request, 'OpenID authentication failed: %s' %
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]