Module ucam_wls.request

Expand source code
from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode

from .errors import InvalidAuthRequest, ProtocolVersionUnsupported

def clean_url(url):
    parts = urlsplit(url)
    if parts.scheme not in ['http', 'https']:
        raise InvalidAuthRequest("Invalid URL scheme")
    return url

class AuthRequest:
    "An authentication request sent by a WAA."

    REQUIRED_PARAMS = {'ver', 'url'}
    OPTIONAL_PARAMS = {'desc', 'aauth', 'iact', 'msg', 'params', 'date', 'skew', 'fail'}
    VALID_PARAMS = REQUIRED_PARAMS | OPTIONAL_PARAMS
    IGNORED_PARAMS = {'skew'}

    def __init__(self, ver, url, desc='', aauth='', iact='', msg='', params='',
                 date='', fail='', skew=None):
        self.ver = int(ver)
        self.url = clean_url(url)
        self.desc = desc

        if isinstance(aauth, list):
            self.aauth = aauth
        elif isinstance(aauth, str):
            self.aauth = aauth.split(',')
        elif aauth is None:
            self.aauth = []
        else:
            raise ValueError("Unparseable aauth value %r" % aauth)

        if isinstance(iact, bool):
            self.iact = iact
        elif iact == 'yes':
            self.iact = True
        elif iact == 'no':
            self.iact = False
        else:
            self.iact = None

        self.msg = msg
        self.params = params

        self.date = date

        if isinstance(fail, bool):
            self.fail = fail
        else:
            self.fail = (fail == 'yes')

    @property
    def params_dict(self):
        d = {k: getattr(self, k) for k in self.VALID_PARAMS - self.IGNORED_PARAMS}
        if self.iact == True:
            d['iact'] = 'yes'
        elif self.iact == False:
            d['iact'] = 'no'
        else:
            d['iact'] = ''
        d['fail'] = 'yes' if self.fail else ''
        d['aauth'] = ','.join(self.aauth)
        return d

    @classmethod
    def from_params_dict(cls, params_dict, check_supported=True, ignore_unknown=False):
        d = dict(params_dict)

        # Go from 1-long lists of values to just the values
        for k, v in d.items():
            if isinstance(v, list):
                d[k] = v[0]

        if cls.REQUIRED_PARAMS - set(d.keys()) != set():
            raise InvalidAuthRequest(
                "Missing required parameter(s): %s" %
                ', '.join(cls.REQUIRED_PARAMS - set(d.keys()))
            )

        extra_params = set(d.keys()) - (cls.VALID_PARAMS)
        if extra_params:
            if ignore_unknown:
                for k in extra_params:
                    del d[k]
            else:
                raise InvalidAuthRequest("Unknown parameter(s): %s" %
                                         ', '.join(extra_params))

        try:
            d['ver'] = int(d['ver'])
        except ValueError as e:
            raise InvalidAuthRequest("ver parameter is not an integer") from e

        req = cls(**d)
        if not req.data_valid:
            raise InvalidAuthRequest("Authentication request failed validation")

        if check_supported and not req.version_supported:
            raise ProtocolVersionUnsupported(d['ver'])

        return req

    @property
    def as_query_string(self):
        return urlencode(self.params_dict(), doseq=True)

    @classmethod
    def from_query_string(cls, query_string, *args, **kwargs):
        params_dict = parse_qs(query_string)

        for k, values in params_dict.items():
            if len(values) > 1:
                raise InvalidAuthRequest("Repeated parameter %s" % k)

        return cls.from_params_dict(params_dict, *args, **kwargs)

    @property
    def data_valid(self):
        result = all([
            isinstance(self.ver, int),
            isinstance(self.url, str),
            isinstance(self.desc, str),
            isinstance(self.aauth, list),
            isinstance(self.iact, bool) or self.iact is None,
            isinstance(self.msg, str) or self.msg is None,
            isinstance(self.params, str) or self.params is None,
            isinstance(self.fail, bool) or self.fail is None,
        ])
        if self.desc is not None:
            result = result and all([0x20 <= ord(c) <= 0x7e for c in self.desc])
        if self.msg is not None:
            result = result and all([0x20 <= ord(c) <= 0x7e for c in self.msg])
        return result

    @property
    def version_supported(self):
        return 1 <= self.ver <= 3

Functions

def clean_url(url)
Expand source code
def clean_url(url):
    parts = urlsplit(url)
    if parts.scheme not in ['http', 'https']:
        raise InvalidAuthRequest("Invalid URL scheme")
    return url

Classes

class AuthRequest (ver, url, desc='', aauth='', iact='', msg='', params='', date='', fail='', skew=None)

An authentication request sent by a WAA.

Expand source code
class AuthRequest:
    "An authentication request sent by a WAA."

    REQUIRED_PARAMS = {'ver', 'url'}
    OPTIONAL_PARAMS = {'desc', 'aauth', 'iact', 'msg', 'params', 'date', 'skew', 'fail'}
    VALID_PARAMS = REQUIRED_PARAMS | OPTIONAL_PARAMS
    IGNORED_PARAMS = {'skew'}

    def __init__(self, ver, url, desc='', aauth='', iact='', msg='', params='',
                 date='', fail='', skew=None):
        self.ver = int(ver)
        self.url = clean_url(url)
        self.desc = desc

        if isinstance(aauth, list):
            self.aauth = aauth
        elif isinstance(aauth, str):
            self.aauth = aauth.split(',')
        elif aauth is None:
            self.aauth = []
        else:
            raise ValueError("Unparseable aauth value %r" % aauth)

        if isinstance(iact, bool):
            self.iact = iact
        elif iact == 'yes':
            self.iact = True
        elif iact == 'no':
            self.iact = False
        else:
            self.iact = None

        self.msg = msg
        self.params = params

        self.date = date

        if isinstance(fail, bool):
            self.fail = fail
        else:
            self.fail = (fail == 'yes')

    @property
    def params_dict(self):
        d = {k: getattr(self, k) for k in self.VALID_PARAMS - self.IGNORED_PARAMS}
        if self.iact == True:
            d['iact'] = 'yes'
        elif self.iact == False:
            d['iact'] = 'no'
        else:
            d['iact'] = ''
        d['fail'] = 'yes' if self.fail else ''
        d['aauth'] = ','.join(self.aauth)
        return d

    @classmethod
    def from_params_dict(cls, params_dict, check_supported=True, ignore_unknown=False):
        d = dict(params_dict)

        # Go from 1-long lists of values to just the values
        for k, v in d.items():
            if isinstance(v, list):
                d[k] = v[0]

        if cls.REQUIRED_PARAMS - set(d.keys()) != set():
            raise InvalidAuthRequest(
                "Missing required parameter(s): %s" %
                ', '.join(cls.REQUIRED_PARAMS - set(d.keys()))
            )

        extra_params = set(d.keys()) - (cls.VALID_PARAMS)
        if extra_params:
            if ignore_unknown:
                for k in extra_params:
                    del d[k]
            else:
                raise InvalidAuthRequest("Unknown parameter(s): %s" %
                                         ', '.join(extra_params))

        try:
            d['ver'] = int(d['ver'])
        except ValueError as e:
            raise InvalidAuthRequest("ver parameter is not an integer") from e

        req = cls(**d)
        if not req.data_valid:
            raise InvalidAuthRequest("Authentication request failed validation")

        if check_supported and not req.version_supported:
            raise ProtocolVersionUnsupported(d['ver'])

        return req

    @property
    def as_query_string(self):
        return urlencode(self.params_dict(), doseq=True)

    @classmethod
    def from_query_string(cls, query_string, *args, **kwargs):
        params_dict = parse_qs(query_string)

        for k, values in params_dict.items():
            if len(values) > 1:
                raise InvalidAuthRequest("Repeated parameter %s" % k)

        return cls.from_params_dict(params_dict, *args, **kwargs)

    @property
    def data_valid(self):
        result = all([
            isinstance(self.ver, int),
            isinstance(self.url, str),
            isinstance(self.desc, str),
            isinstance(self.aauth, list),
            isinstance(self.iact, bool) or self.iact is None,
            isinstance(self.msg, str) or self.msg is None,
            isinstance(self.params, str) or self.params is None,
            isinstance(self.fail, bool) or self.fail is None,
        ])
        if self.desc is not None:
            result = result and all([0x20 <= ord(c) <= 0x7e for c in self.desc])
        if self.msg is not None:
            result = result and all([0x20 <= ord(c) <= 0x7e for c in self.msg])
        return result

    @property
    def version_supported(self):
        return 1 <= self.ver <= 3

Class variables

var IGNORED_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

var OPTIONAL_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

var REQUIRED_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

var VALID_PARAMS

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

Static methods

def from_params_dict(params_dict, check_supported=True, ignore_unknown=False)
Expand source code
@classmethod
def from_params_dict(cls, params_dict, check_supported=True, ignore_unknown=False):
    d = dict(params_dict)

    # Go from 1-long lists of values to just the values
    for k, v in d.items():
        if isinstance(v, list):
            d[k] = v[0]

    if cls.REQUIRED_PARAMS - set(d.keys()) != set():
        raise InvalidAuthRequest(
            "Missing required parameter(s): %s" %
            ', '.join(cls.REQUIRED_PARAMS - set(d.keys()))
        )

    extra_params = set(d.keys()) - (cls.VALID_PARAMS)
    if extra_params:
        if ignore_unknown:
            for k in extra_params:
                del d[k]
        else:
            raise InvalidAuthRequest("Unknown parameter(s): %s" %
                                     ', '.join(extra_params))

    try:
        d['ver'] = int(d['ver'])
    except ValueError as e:
        raise InvalidAuthRequest("ver parameter is not an integer") from e

    req = cls(**d)
    if not req.data_valid:
        raise InvalidAuthRequest("Authentication request failed validation")

    if check_supported and not req.version_supported:
        raise ProtocolVersionUnsupported(d['ver'])

    return req
def from_query_string(query_string, *args, **kwargs)
Expand source code
@classmethod
def from_query_string(cls, query_string, *args, **kwargs):
    params_dict = parse_qs(query_string)

    for k, values in params_dict.items():
        if len(values) > 1:
            raise InvalidAuthRequest("Repeated parameter %s" % k)

    return cls.from_params_dict(params_dict, *args, **kwargs)

Instance variables

var as_query_string
Expand source code
@property
def as_query_string(self):
    return urlencode(self.params_dict(), doseq=True)
var data_valid
Expand source code
@property
def data_valid(self):
    result = all([
        isinstance(self.ver, int),
        isinstance(self.url, str),
        isinstance(self.desc, str),
        isinstance(self.aauth, list),
        isinstance(self.iact, bool) or self.iact is None,
        isinstance(self.msg, str) or self.msg is None,
        isinstance(self.params, str) or self.params is None,
        isinstance(self.fail, bool) or self.fail is None,
    ])
    if self.desc is not None:
        result = result and all([0x20 <= ord(c) <= 0x7e for c in self.desc])
    if self.msg is not None:
        result = result and all([0x20 <= ord(c) <= 0x7e for c in self.msg])
    return result
var params_dict
Expand source code
@property
def params_dict(self):
    d = {k: getattr(self, k) for k in self.VALID_PARAMS - self.IGNORED_PARAMS}
    if self.iact == True:
        d['iact'] = 'yes'
    elif self.iact == False:
        d['iact'] = 'no'
    else:
        d['iact'] = ''
    d['fail'] = 'yes' if self.fail else ''
    d['aauth'] = ','.join(self.aauth)
    return d
var version_supported
Expand source code
@property
def version_supported(self):
    return 1 <= self.ver <= 3