Module ucam_wls

ucam_wls top-level module. The entire public API is made available here.

Expand source code
"ucam_wls top-level module.  The entire public API is made available here."

from . import context, request, response, signing

from .context import AuthPrincipal, LoginService
from .request import AuthRequest
from .response import AuthResponse
from .signing import Key

from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend


__all__ = ["context", "request", "response", "signing",
           "AuthPrincipal", "LoginService",
           "AuthRequest", "AuthResponse", "Key",
           "load_private_key"]


def load_private_key(path, kid, password=None):
    """
    Load a PEM-encoded private key from a given path, assigning it a specified
    key ID ('kid').  A password, if needed, can optionally be specified.

    :type path: Path-like object (can be :class:`str`)
    :param path:        The filesystem path to the private key file.

    :type kid: :class:`int`
    :param kid:         The key ID to assign to this private key.

    :type kid: :class:`bytes` or :class:`str`
    :param password:    The password, if needed, to decrypt the private key.
                        Should be :const:`None` if the key is not encrypted.
                        If password is given as a :class:`str`, it will be
                        decoded as UTF-8.

    Returns a :class:`ucam_wls.signing.Key` instance.
    """
    if not isinstance(kid, int):
        raise TypeError("kid must be an integer")
    if isinstance(password, str):
        password = password.decode()
    with open(path, 'rb') as f:
        key = load_pem_private_key(f.read(), password, default_backend())
    return Key(key, kid)

Sub-modules

ucam_wls.context
ucam_wls.errors
ucam_wls.request
ucam_wls.response
ucam_wls.signing
ucam_wls.status
ucam_wls.util

Functions

def load_private_key(path, kid, password=None)

Load a PEM-encoded private key from a given path, assigning it a specified key ID ('kid'). A password, if needed, can optionally be specified.

:type path: Path-like object (can be :class:str) :param path: The filesystem path to the private key file.

:type kid: :class:int :param kid: The key ID to assign to this private key.

:type kid: :class:bytes or :class:str :param password: The password, if needed, to decrypt the private key. Should be :const:None if the key is not encrypted. If password is given as a :class:str, it will be decoded as UTF-8.

Returns a :class:Key instance.

Expand source code
def load_private_key(path, kid, password=None):
    """
    Load a PEM-encoded private key from a given path, assigning it a specified
    key ID ('kid').  A password, if needed, can optionally be specified.

    :type path: Path-like object (can be :class:`str`)
    :param path:        The filesystem path to the private key file.

    :type kid: :class:`int`
    :param kid:         The key ID to assign to this private key.

    :type kid: :class:`bytes` or :class:`str`
    :param password:    The password, if needed, to decrypt the private key.
                        Should be :const:`None` if the key is not encrypted.
                        If password is given as a :class:`str`, it will be
                        decoded as UTF-8.

    Returns a :class:`ucam_wls.signing.Key` instance.
    """
    if not isinstance(kid, int):
        raise TypeError("kid must be an integer")
    if isinstance(password, str):
        password = password.decode()
    with open(path, 'rb') as f:
        key = load_pem_private_key(f.read(), password, default_backend())
    return Key(key, kid)

Classes

class AuthPrincipal (userid, auth_methods, ptags=None, session_expiry=None)
Expand source code
class AuthPrincipal:
    def __init__(self, userid, auth_methods, ptags=None, session_expiry=None):
        self.userid = userid
        self.auth_methods = auth_methods
        if ptags is None:
            ptags = []
        self.ptags = ptags
        self.session_expiry = session_expiry
class AuthRequest (ver, url, desc='', aauth='', iact='', msg='', params='', date='', fail='', skew=None)

An authentication request sent by a WAA.

Expand source code
class AuthRequest:
    "An authentication request sent by a WAA."

    REQUIRED_PARAMS = {'ver', 'url'}
    OPTIONAL_PARAMS = {'desc', 'aauth', 'iact', 'msg', 'params', 'date', 'skew', 'fail'}
    VALID_PARAMS = REQUIRED_PARAMS | OPTIONAL_PARAMS
    IGNORED_PARAMS = {'skew'}

    def __init__(self, ver, url, desc='', aauth='', iact='', msg='', params='',
                 date='', fail='', skew=None):
        self.ver = int(ver)
        self.url = clean_url(url)
        self.desc = desc

        if isinstance(aauth, list):
            self.aauth = aauth
        elif isinstance(aauth, str):
            self.aauth = aauth.split(',')
        elif aauth is None:
            self.aauth = []
        else:
            raise ValueError("Unparseable aauth value %r" % aauth)

        if isinstance(iact, bool):
            self.iact = iact
        elif iact == 'yes':
            self.iact = True
        elif iact == 'no':
            self.iact = False
        else:
            self.iact = None

        self.msg = msg
        self.params = params

        self.date = date

        if isinstance(fail, bool):
            self.fail = fail
        else:
            self.fail = (fail == 'yes')

    @property
    def params_dict(self):
        d = {k: getattr(self, k) for k in self.VALID_PARAMS - self.IGNORED_PARAMS}
        if self.iact == True:
            d['iact'] = 'yes'
        elif self.iact == False:
            d['iact'] = 'no'
        else:
            d['iact'] = ''
        d['fail'] = 'yes' if self.fail else ''
        d['aauth'] = ','.join(self.aauth)
        return d

    @classmethod
    def from_params_dict(cls, params_dict, check_supported=True, ignore_unknown=False):
        d = dict(params_dict)

        # Go from 1-long lists of values to just the values
        for k, v in d.items():
            if isinstance(v, list):
                d[k] = v[0]

        if cls.REQUIRED_PARAMS - set(d.keys()) != set():
            raise InvalidAuthRequest(
                "Missing required parameter(s): %s" %
                ', '.join(cls.REQUIRED_PARAMS - set(d.keys()))
            )

        extra_params = set(d.keys()) - (cls.VALID_PARAMS)
        if extra_params:
            if ignore_unknown:
                for k in extra_params:
                    del d[k]
            else:
                raise InvalidAuthRequest("Unknown parameter(s): %s" %
                                         ', '.join(extra_params))

        try:
            d['ver'] = int(d['ver'])
        except ValueError as e:
            raise InvalidAuthRequest("ver parameter is not an integer") from e

        req = cls(**d)
        if not req.data_valid:
            raise InvalidAuthRequest("Authentication request failed validation")

        if check_supported and not req.version_supported:
            raise ProtocolVersionUnsupported(d['ver'])

        return req

    @property
    def as_query_string(self):
        return urlencode(self.params_dict(), doseq=True)

    @classmethod
    def from_query_string(cls, query_string, *args, **kwargs):
        params_dict = parse_qs(query_string)

        for k, values in params_dict.items():
            if len(values) > 1:
                raise InvalidAuthRequest("Repeated parameter %s" % k)

        return cls.from_params_dict(params_dict, *args, **kwargs)

    @property
    def data_valid(self):
        result = all([
            isinstance(self.ver, int),
            isinstance(self.url, str),
            isinstance(self.desc, str),
            isinstance(self.aauth, list),
            isinstance(self.iact, bool) or self.iact is None,
            isinstance(self.msg, str) or self.msg is None,
            isinstance(self.params, str) or self.params is None,
            isinstance(self.fail, bool) or self.fail is None,
        ])
        if self.desc is not None:
            result = result and all([0x20 <= ord(c) <= 0x7e for c in self.desc])
        if self.msg is not None:
            result = result and all([0x20 <= ord(c) <= 0x7e for c in self.msg])
        return result

    @property
    def version_supported(self):
        return 1 <= self.ver <= 3

Class variables

var IGNORED_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

var OPTIONAL_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

var REQUIRED_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

var VALID_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

Static methods

def from_params_dict(params_dict, check_supported=True, ignore_unknown=False)
Expand source code
@classmethod
def from_params_dict(cls, params_dict, check_supported=True, ignore_unknown=False):
    d = dict(params_dict)

    # Go from 1-long lists of values to just the values
    for k, v in d.items():
        if isinstance(v, list):
            d[k] = v[0]

    if cls.REQUIRED_PARAMS - set(d.keys()) != set():
        raise InvalidAuthRequest(
            "Missing required parameter(s): %s" %
            ', '.join(cls.REQUIRED_PARAMS - set(d.keys()))
        )

    extra_params = set(d.keys()) - (cls.VALID_PARAMS)
    if extra_params:
        if ignore_unknown:
            for k in extra_params:
                del d[k]
        else:
            raise InvalidAuthRequest("Unknown parameter(s): %s" %
                                     ', '.join(extra_params))

    try:
        d['ver'] = int(d['ver'])
    except ValueError as e:
        raise InvalidAuthRequest("ver parameter is not an integer") from e

    req = cls(**d)
    if not req.data_valid:
        raise InvalidAuthRequest("Authentication request failed validation")

    if check_supported and not req.version_supported:
        raise ProtocolVersionUnsupported(d['ver'])

    return req
def from_query_string(query_string, *args, **kwargs)
Expand source code
@classmethod
def from_query_string(cls, query_string, *args, **kwargs):
    params_dict = parse_qs(query_string)

    for k, values in params_dict.items():
        if len(values) > 1:
            raise InvalidAuthRequest("Repeated parameter %s" % k)

    return cls.from_params_dict(params_dict, *args, **kwargs)

Instance variables

var as_query_string
Expand source code
@property
def as_query_string(self):
    return urlencode(self.params_dict(), doseq=True)
var data_valid
Expand source code
@property
def data_valid(self):
    result = all([
        isinstance(self.ver, int),
        isinstance(self.url, str),
        isinstance(self.desc, str),
        isinstance(self.aauth, list),
        isinstance(self.iact, bool) or self.iact is None,
        isinstance(self.msg, str) or self.msg is None,
        isinstance(self.params, str) or self.params is None,
        isinstance(self.fail, bool) or self.fail is None,
    ])
    if self.desc is not None:
        result = result and all([0x20 <= ord(c) <= 0x7e for c in self.desc])
    if self.msg is not None:
        result = result and all([0x20 <= ord(c) <= 0x7e for c in self.msg])
    return result
var params_dict
Expand source code
@property
def params_dict(self):
    d = {k: getattr(self, k) for k in self.VALID_PARAMS - self.IGNORED_PARAMS}
    if self.iact == True:
        d['iact'] = 'yes'
    elif self.iact == False:
        d['iact'] = 'no'
    else:
        d['iact'] = ''
    d['fail'] = 'yes' if self.fail else ''
    d['aauth'] = ','.join(self.aauth)
    return d
var version_supported
Expand source code
@property
def version_supported(self):
    return 1 <= self.ver <= 3
class AuthResponse (ver, code, url, params, principal=None, msg=None, issue=None, ptags=None, auth=None, sso=None, life=None)
Expand source code
class AuthResponse:
    @classmethod
    def respond_to_request(cls, request, code, *args, **kwargs):
        if not isinstance(request, AuthRequest):
            raise TypeError("request must be an AuthRequest instance")
        if not isinstance(code, int):
            raise TypeError("status code must be an integer")

        return cls(ver=request.ver, code=code, url=request.url,
                   params=request.params, *args, **kwargs)

    PARAMS = {'code', 'principal', 'msg', 'issue', 'id', 'url', 'ptags', 'auth',
              'sso', 'life', 'params', 'kid', 'signature'}

    def __init__(self, ver, code, url, params, principal=None, msg=None,
                 issue=None, ptags=None, auth=None, sso=None, life=None):
        if not isinstance(code, int):
            raise TypeError("code %r should be an integer" % code)

        if principal is None:
            principal = ''
        if msg is None:
            msg = ''
        if issue is None:
            issue = datetime.utcnow()
        if ptags is None:
            ptags = []
        if auth is None:
            auth = ''
        if sso is None:
            sso = []

        # Check for invalid combinations of values
        if ((code == status.SUCCESS and principal == '') or 
            (code != status.SUCCESS and principal != '')):
            raise ValueError("principal must only be given if "
                             "authentication was successful")
        if code == status.SUCCESS and auth == '' and len(sso) == 0:
            raise ValueError("sso must be given if auth is not given")

        self.ver = ver
        self.code = code
        self.principal = principal
        self.msg = msg
        self.issue = issue
        self.id = random.randint(100000, 999999)
        self.url = url
        self.ptags = ptags
        self.auth = auth
        self.sso = sso
        self.life = life
        self.params = params

        self.kid = None
        self.signature = None

    @property
    def as_dict(self):
        return {k: getattr(self, k) for k in PARAMS}

    @property
    def signature_b64(self):
        if self.signature is None:
            return None
        return base64.b64encode(self.signature).decode()\
               .replace('+', '-').replace('/', '.').replace('=', '_')

    @property
    def requires_signature(self):
        return self.code == status.SUCCESS

    @property
    def is_signed(self):
        return self.signature is not None

    @property
    def message_to_sign(self):
        parts = [self.ver, self.code, self.msg,
                 datetime_to_protocol(self.issue),
                 self.id, self.url, self.principal] + \
                ([','.join(self.ptags)] if self.ver == 3 else []) + \
                [self.auth, ','.join(self.sso), self.life, self.params]
        return '!'.join(map(encode_response_part, parts))

    @property
    def response_string(self):
        if self.requires_signature and not self.is_signed:
            raise SignatureNeeded("response code is %d" % self.code)

        return '!'.join([
            self.message_to_sign,
            encode_response_part(self.kid),
            encode_response_part(self.signature_b64),
        ])

    @property
    def redirect_url(self):
        scheme, netloc, path, orig_query, _ = urlsplit(self.url)

        if self.ver == 1:
            # Ignore existing query string
            qsl = []
        else:
            # Incorporate WLS-Response into existing query string
            qsl = parse_qsl(orig_query)

        qsl.append(('WLS-Response', self.response_string))
        query = urlencode(qsl)
        return urlunsplit((scheme, netloc, path, query, ''))

Class variables

var PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

Static methods

def respond_to_request(request, code, *args, **kwargs)
Expand source code
@classmethod
def respond_to_request(cls, request, code, *args, **kwargs):
    if not isinstance(request, AuthRequest):
        raise TypeError("request must be an AuthRequest instance")
    if not isinstance(code, int):
        raise TypeError("status code must be an integer")

    return cls(ver=request.ver, code=code, url=request.url,
               params=request.params, *args, **kwargs)

Instance variables

var as_dict
Expand source code
@property
def as_dict(self):
    return {k: getattr(self, k) for k in PARAMS}
var is_signed
Expand source code
@property
def is_signed(self):
    return self.signature is not None
var message_to_sign
Expand source code
@property
def message_to_sign(self):
    parts = [self.ver, self.code, self.msg,
             datetime_to_protocol(self.issue),
             self.id, self.url, self.principal] + \
            ([','.join(self.ptags)] if self.ver == 3 else []) + \
            [self.auth, ','.join(self.sso), self.life, self.params]
    return '!'.join(map(encode_response_part, parts))
var redirect_url
Expand source code
@property
def redirect_url(self):
    scheme, netloc, path, orig_query, _ = urlsplit(self.url)

    if self.ver == 1:
        # Ignore existing query string
        qsl = []
    else:
        # Incorporate WLS-Response into existing query string
        qsl = parse_qsl(orig_query)

    qsl.append(('WLS-Response', self.response_string))
    query = urlencode(qsl)
    return urlunsplit((scheme, netloc, path, query, ''))
var requires_signature
Expand source code
@property
def requires_signature(self):
    return self.code == status.SUCCESS
var response_string
Expand source code
@property
def response_string(self):
    if self.requires_signature and not self.is_signed:
        raise SignatureNeeded("response code is %d" % self.code)

    return '!'.join([
        self.message_to_sign,
        encode_response_part(self.kid),
        encode_response_part(self.signature_b64),
    ])
var signature_b64
Expand source code
@property
def signature_b64(self):
    if self.signature is None:
        return None
    return base64.b64encode(self.signature).decode()\
           .replace('+', '-').replace('/', '.').replace('=', '_')
class Key (private_key, kid)
Expand source code
class Key:
    def __init__(self, private_key, kid):
        if not isinstance(kid, int):
            raise TypeError("kid must be an integer")
        if not (1 <= kid <= 99999999):
            raise ValueError("kid must be in the range 1 to 99999999")
        self._private_key = private_key
        self._kid = kid

    @property
    def private_key(self):
        return self._private_key

    @property
    def kid(self):
        return self._kid

    def sign(self, response):
        response.kid = self.kid
        response.signature = self.private_key.sign(
            response.message_to_sign.encode(),
            padding.PKCS1v15(),
            hashes.SHA1(),
        )

Instance variables

var kid
Expand source code
@property
def kid(self):
    return self._kid
var private_key
Expand source code
@property
def private_key(self):
    return self._private_key

Methods

def sign(self, response)
Expand source code
def sign(self, response):
    response.kid = self.kid
    response.signature = self.private_key.sign(
        response.message_to_sign.encode(),
        padding.PKCS1v15(),
        hashes.SHA1(),
    )
class LoginService (key, auth_methods)

High-level interface to implement a web login service (WLS).

This class provides a convenient interface for implementing a WLS with any authentication backend. It is intended to be instantiated with a single private key, which is used to sign the responses it generates.

Mechanisms deemed useful for WLS implementation are provided: - storing the list of supported authentication methods, and checking whether the WLS and a WAA's request have an method in common - checking whether the protocol version specified in the WAA request is supported by ucam_wls

These mechanisms can optionally be turned off.

Attributes

key : Key
a private key to be used to sign responses
auth_methods : list
a list of supported authentication methods
Expand source code
class LoginService:
    """High-level interface to implement a web login service (WLS).

    This class provides a convenient interface for implementing a WLS with any
    authentication backend.  It is intended to be instantiated with a single
    private key, which is used to sign the responses it generates.

    Mechanisms deemed useful for WLS implementation are provided:
      - storing the list of supported authentication methods, and checking
        whether the WLS and a WAA's request have an method in common
      - checking whether the protocol version specified in the WAA request is
        supported by `ucam_wls`

    These mechanisms can optionally be turned off.

    Attributes:
        key (ucam_wls.signing.Key): a private key to be used to sign responses
        auth_methods (list): a list of supported authentication methods
    """
    def __init__(self, key, auth_methods):
        if not isinstance(key, Key):
            raise TypeError("key must be a ucam_wls.signing.Key instance")
        self.key = key
        self.auth_methods = auth_methods

    def have_mutual_auth_type(self, request):
        if request.aauth and any(request.aauth):
            return set(request.aauth) & set(self.auth_methods) != set()
        else:
            return True

    def _pre_response(self, request, skip_handling_check, check_auth_types=True):
        if not skip_handling_check:
            if not request.data_valid:
                raise InvalidAuthRequest
            if check_auth_types and not self.have_mutual_auth_type(request):
                raise NoMutualAuthType(
                    "WLS supports %s; WAA wants one of %s" % (
                        self.auth_methods, request.aauth
                    )
                )
            if not request.version_supported:
                raise ProtocolVersionUnsupported(request.ver)

    def _finish_response(self, response, sign=True, force_signature=False):
        if sign or response.requires_signature:
            if not response.is_signed or force_signature:
                self.key.sign(response)
        return response

    def authenticate_active(self, request, principal, auth, life=None,
                            sign=True, skip_handling_check=False, *args, **kwargs):
        """Generate a WLS 'success' response based on interaction with the user

        This function creates a WLS response specifying that the principal was
        authenticated based on 'fresh' interaction with the user (e.g. input of
        a username and password).

        Args:
            request (AuthRequest): the original WAA request
            principal (AuthPrincipal): the principal authenticated by the WLS
            auth (str): the authentication method used by the principal.
            life (int): if specified, the validity (in seconds) of the
                        principal's session with the WLS.
            sign (bool): whether to sign the response or not.  Recommended to
                leave this at the default value of `True` (see warning below).

            *args: passed to `AuthResponse.respond_to_request`
            **kwargs: passed to `AuthResponse.respond_to_request`

        Returns:
            An `AuthResponse` instance matching the given arguments.

        Warning:
            Responses indicating successful authentication *MUST* be signed by
            the WLS.  It is recommended that you leave `sign` set to `True`, or
            make sure to sign the response manually afterwards.
        """
        self._pre_response(request, skip_handling_check)

        if request.iact == False:
            raise ValueError("WAA demanded passive authentication (iact == 'no')")

        if life is None and principal.session_expiry is not None:
            life = int((principal.session_expiry - datetime.datetime.utcnow()).total_seconds())

        response = AuthResponse.respond_to_request(
            request=request, code=status.SUCCESS, principal=principal.userid,
            auth=auth, ptags=principal.ptags, life=life, *args, **kwargs
        )
        return self._finish_response(response=response, sign=sign)

    def authenticate_passive(self, request, principal, sso=[], sign=True,
                             skip_handling_check=False, *args, **kwargs):
        """Generate a WLS 'success' response based on a pre-existing identity

        This function creates a WLS response specifying that the principal was
        authenticated based on previous successful authentication (e.g. an
        existing WLS session cookie).

        Args:
            request (AuthRequest): the original WAA request
            principal (AuthPrincipal): the principal authenticated by the WLS
            sso (list): a list of strings indicating the authentication methods
                previously used for authentication by the principal.  If an
                empty list is passed, `principal.auth_methods` will be used.
            sign (bool): whether to sign the response or not.  Recommended to
                leave this at the default value of `True` (see warning below).

            *args: passed to `AuthResponse.respond_to_request`
            **kwargs: passed to `AuthResponse.respond_to_request`

        Returns:
            An `AuthResponse` instance matching the given arguments.

        Warning:
            Responses indicating successful authentication *MUST* be signed by
            the WLS.  It is recommended that you leave `sign` set to `True`, or
            make sure to sign the response manually afterwards.
        """
        self._pre_response(request, skip_handling_check)

        if request.iact == True:
            raise ValueError("WAA demanded active authentication (iact == 'yes')")

        if len(sso) == 0:
            sso = principal.auth_methods

        if len(sso) == 0:
            raise ValueError("no authentication methods specified for `sso`")

        if principal.session_expiry is not None:
            life = int((principal.session_expiry - datetime.datetime.utcnow()).total_seconds())
        else:
            life = None

        response = AuthResponse.respond_to_request(
            request=request, code=status.SUCCESS, principal=principal.userid,
            sso=sso, ptags=principal.ptags, life=life, *args, **kwargs
        )
        return self._finish_response(response=response, sign=sign)

    def generate_failure(self, code, request, msg='', sign=True,
                         skip_handling_check=False, *args, **kwargs):
        """Generate a response indicating failure.

        This is to be used in all cases where the outcome of user interaction
        is not success.  This function will refuse to handle a request where
        the 'fail' parameter is 'yes' (in which case the WLS must not redirect
        back to the WAA).

        Args:
            code (int): the response status code.  Values specified in the
                protocol are available as constants under `ucam_wls.status`.
            request (AuthRequest): the original WAA request
            msg (str): an optional message that could be shown to the end user
                by the WAA
            sign (bool): whether to sign the response or not.

            *args: passed to `AuthResponse.respond_to_request`
            **kwargs: passed to `AuthResponse.respond_to_request`

        Returns:
            An `AuthResponse` instance matching the given arguments.

        Note:
            Signatures on WLS responses indicating a non-success can optionally
            be signed.   In the interests of security, the default in this
            function is to go ahead and sign anyway, but this can be turned off
            if really desired.
        """
        self._pre_response(request, skip_handling_check, check_auth_types=False)

        if request.fail:
            raise ValueError("WAA specified that WLS must not redirect "
                             "back to it on failure")

        if code == status.SUCCESS:
            raise ValueError("Failure responses must not have success status")

        response = AuthResponse.respond_to_request(
            request=request, code=code, *args, **kwargs
        )
        return self._finish_response(response=response, sign=sign)

Methods

def authenticate_active(self, request, principal, auth, life=None, sign=True, skip_handling_check=False, *args, **kwargs)

Generate a WLS 'success' response based on interaction with the user

This function creates a WLS response specifying that the principal was authenticated based on 'fresh' interaction with the user (e.g. input of a username and password).

Args

ucam_wls.request : AuthRequest
the original WAA request
principal : AuthPrincipal
the principal authenticated by the WLS
auth : str
the authentication method used by the principal.
life : int
if specified, the validity (in seconds) of the principal's session with the WLS.
sign : bool
whether to sign the response or not. Recommended to leave this at the default value of True (see warning below).
*args
passed to AuthResponse.respond_to_request()
**kwargs
passed to AuthResponse.respond_to_request()

Returns

An AuthResponse instance matching the given arguments.

Warning

Responses indicating successful authentication MUST be signed by the WLS. It is recommended that you leave sign set to True, or make sure to sign the response manually afterwards.

Expand source code
def authenticate_active(self, request, principal, auth, life=None,
                        sign=True, skip_handling_check=False, *args, **kwargs):
    """Generate a WLS 'success' response based on interaction with the user

    This function creates a WLS response specifying that the principal was
    authenticated based on 'fresh' interaction with the user (e.g. input of
    a username and password).

    Args:
        request (AuthRequest): the original WAA request
        principal (AuthPrincipal): the principal authenticated by the WLS
        auth (str): the authentication method used by the principal.
        life (int): if specified, the validity (in seconds) of the
                    principal's session with the WLS.
        sign (bool): whether to sign the response or not.  Recommended to
            leave this at the default value of `True` (see warning below).

        *args: passed to `AuthResponse.respond_to_request`
        **kwargs: passed to `AuthResponse.respond_to_request`

    Returns:
        An `AuthResponse` instance matching the given arguments.

    Warning:
        Responses indicating successful authentication *MUST* be signed by
        the WLS.  It is recommended that you leave `sign` set to `True`, or
        make sure to sign the response manually afterwards.
    """
    self._pre_response(request, skip_handling_check)

    if request.iact == False:
        raise ValueError("WAA demanded passive authentication (iact == 'no')")

    if life is None and principal.session_expiry is not None:
        life = int((principal.session_expiry - datetime.datetime.utcnow()).total_seconds())

    response = AuthResponse.respond_to_request(
        request=request, code=status.SUCCESS, principal=principal.userid,
        auth=auth, ptags=principal.ptags, life=life, *args, **kwargs
    )
    return self._finish_response(response=response, sign=sign)
def authenticate_passive(self, request, principal, sso=[], sign=True, skip_handling_check=False, *args, **kwargs)

Generate a WLS 'success' response based on a pre-existing identity

This function creates a WLS response specifying that the principal was authenticated based on previous successful authentication (e.g. an existing WLS session cookie).

Args

ucam_wls.request : AuthRequest
the original WAA request
principal : AuthPrincipal
the principal authenticated by the WLS
sso : list
a list of strings indicating the authentication methods previously used for authentication by the principal. If an empty list is passed, principal.auth_methods will be used.
sign : bool
whether to sign the response or not. Recommended to leave this at the default value of True (see warning below).
*args
passed to AuthResponse.respond_to_request()
**kwargs
passed to AuthResponse.respond_to_request()

Returns

An AuthResponse instance matching the given arguments.

Warning

Responses indicating successful authentication MUST be signed by the WLS. It is recommended that you leave sign set to True, or make sure to sign the response manually afterwards.

Expand source code
def authenticate_passive(self, request, principal, sso=[], sign=True,
                         skip_handling_check=False, *args, **kwargs):
    """Generate a WLS 'success' response based on a pre-existing identity

    This function creates a WLS response specifying that the principal was
    authenticated based on previous successful authentication (e.g. an
    existing WLS session cookie).

    Args:
        request (AuthRequest): the original WAA request
        principal (AuthPrincipal): the principal authenticated by the WLS
        sso (list): a list of strings indicating the authentication methods
            previously used for authentication by the principal.  If an
            empty list is passed, `principal.auth_methods` will be used.
        sign (bool): whether to sign the response or not.  Recommended to
            leave this at the default value of `True` (see warning below).

        *args: passed to `AuthResponse.respond_to_request`
        **kwargs: passed to `AuthResponse.respond_to_request`

    Returns:
        An `AuthResponse` instance matching the given arguments.

    Warning:
        Responses indicating successful authentication *MUST* be signed by
        the WLS.  It is recommended that you leave `sign` set to `True`, or
        make sure to sign the response manually afterwards.
    """
    self._pre_response(request, skip_handling_check)

    if request.iact == True:
        raise ValueError("WAA demanded active authentication (iact == 'yes')")

    if len(sso) == 0:
        sso = principal.auth_methods

    if len(sso) == 0:
        raise ValueError("no authentication methods specified for `sso`")

    if principal.session_expiry is not None:
        life = int((principal.session_expiry - datetime.datetime.utcnow()).total_seconds())
    else:
        life = None

    response = AuthResponse.respond_to_request(
        request=request, code=status.SUCCESS, principal=principal.userid,
        sso=sso, ptags=principal.ptags, life=life, *args, **kwargs
    )
    return self._finish_response(response=response, sign=sign)
def generate_failure(self, code, request, msg='', sign=True, skip_handling_check=False, *args, **kwargs)

Generate a response indicating failure.

This is to be used in all cases where the outcome of user interaction is not success. This function will refuse to handle a request where the 'fail' parameter is 'yes' (in which case the WLS must not redirect back to the WAA).

Args

code : int
the response status code. Values specified in the protocol are available as constants under ucam_wls.status.
ucam_wls.request : AuthRequest
the original WAA request
msg : str
an optional message that could be shown to the end user by the WAA
sign : bool
whether to sign the response or not.
*args
passed to AuthResponse.respond_to_request()
**kwargs
passed to AuthResponse.respond_to_request()

Returns

An AuthResponse instance matching the given arguments.

Note

Signatures on WLS responses indicating a non-success can optionally be signed. In the interests of security, the default in this function is to go ahead and sign anyway, but this can be turned off if really desired.

Expand source code
def generate_failure(self, code, request, msg='', sign=True,
                     skip_handling_check=False, *args, **kwargs):
    """Generate a response indicating failure.

    This is to be used in all cases where the outcome of user interaction
    is not success.  This function will refuse to handle a request where
    the 'fail' parameter is 'yes' (in which case the WLS must not redirect
    back to the WAA).

    Args:
        code (int): the response status code.  Values specified in the
            protocol are available as constants under `ucam_wls.status`.
        request (AuthRequest): the original WAA request
        msg (str): an optional message that could be shown to the end user
            by the WAA
        sign (bool): whether to sign the response or not.

        *args: passed to `AuthResponse.respond_to_request`
        **kwargs: passed to `AuthResponse.respond_to_request`

    Returns:
        An `AuthResponse` instance matching the given arguments.

    Note:
        Signatures on WLS responses indicating a non-success can optionally
        be signed.   In the interests of security, the default in this
        function is to go ahead and sign anyway, but this can be turned off
        if really desired.
    """
    self._pre_response(request, skip_handling_check, check_auth_types=False)

    if request.fail:
        raise ValueError("WAA specified that WLS must not redirect "
                         "back to it on failure")

    if code == status.SUCCESS:
        raise ValueError("Failure responses must not have success status")

    response = AuthResponse.respond_to_request(
        request=request, code=code, *args, **kwargs
    )
    return self._finish_response(response=response, sign=sign)
def have_mutual_auth_type(self, request)
Expand source code
def have_mutual_auth_type(self, request):
    if request.aauth and any(request.aauth):
        return set(request.aauth) & set(self.auth_methods) != set()
    else:
        return True