Module ucam_wls.response
Expand source code
import base64
import random
from datetime import datetime
from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode
from . import status
from .errors import SignatureNeeded
from .request import AuthRequest
from .util import datetime_to_protocol
def encode_response_part(part):
if part is None:
return ''
return str(part).replace('%', '%25').replace('!', '%21')
class AuthResponse:
@classmethod
def respond_to_request(cls, request, code, *args, **kwargs):
if not isinstance(request, AuthRequest):
raise TypeError("request must be an AuthRequest instance")
if not isinstance(code, int):
raise TypeError("status code must be an integer")
return cls(ver=request.ver, code=code, url=request.url,
params=request.params, *args, **kwargs)
PARAMS = {'code', 'principal', 'msg', 'issue', 'id', 'url', 'ptags', 'auth',
'sso', 'life', 'params', 'kid', 'signature'}
def __init__(self, ver, code, url, params, principal=None, msg=None,
issue=None, ptags=None, auth=None, sso=None, life=None):
if not isinstance(code, int):
raise TypeError("code %r should be an integer" % code)
if principal is None:
principal = ''
if msg is None:
msg = ''
if issue is None:
issue = datetime.utcnow()
if ptags is None:
ptags = []
if auth is None:
auth = ''
if sso is None:
sso = []
# Check for invalid combinations of values
if ((code == status.SUCCESS and principal == '') or
(code != status.SUCCESS and principal != '')):
raise ValueError("principal must only be given if "
"authentication was successful")
if code == status.SUCCESS and auth == '' and len(sso) == 0:
raise ValueError("sso must be given if auth is not given")
self.ver = ver
self.code = code
self.principal = principal
self.msg = msg
self.issue = issue
self.id = random.randint(100000, 999999)
self.url = url
self.ptags = ptags
self.auth = auth
self.sso = sso
self.life = life
self.params = params
self.kid = None
self.signature = None
@property
def as_dict(self):
return {k: getattr(self, k) for k in PARAMS}
@property
def signature_b64(self):
if self.signature is None:
return None
return base64.b64encode(self.signature).decode()\
.replace('+', '-').replace('/', '.').replace('=', '_')
@property
def requires_signature(self):
return self.code == status.SUCCESS
@property
def is_signed(self):
return self.signature is not None
@property
def message_to_sign(self):
parts = [self.ver, self.code, self.msg,
datetime_to_protocol(self.issue),
self.id, self.url, self.principal] + \
([','.join(self.ptags)] if self.ver == 3 else []) + \
[self.auth, ','.join(self.sso), self.life, self.params]
return '!'.join(map(encode_response_part, parts))
@property
def response_string(self):
if self.requires_signature and not self.is_signed:
raise SignatureNeeded("response code is %d" % self.code)
return '!'.join([
self.message_to_sign,
encode_response_part(self.kid),
encode_response_part(self.signature_b64),
])
@property
def redirect_url(self):
scheme, netloc, path, orig_query, _ = urlsplit(self.url)
if self.ver == 1:
# Ignore existing query string
qsl = []
else:
# Incorporate WLS-Response into existing query string
qsl = parse_qsl(orig_query)
qsl.append(('WLS-Response', self.response_string))
query = urlencode(qsl)
return urlunsplit((scheme, netloc, path, query, ''))
Functions
def encode_response_part(part)
-
Expand source code
def encode_response_part(part): if part is None: return '' return str(part).replace('%', '%25').replace('!', '%21')
Classes
class AuthResponse (ver, code, url, params, principal=None, msg=None, issue=None, ptags=None, auth=None, sso=None, life=None)
-
Expand source code
class AuthResponse: @classmethod def respond_to_request(cls, request, code, *args, **kwargs): if not isinstance(request, AuthRequest): raise TypeError("request must be an AuthRequest instance") if not isinstance(code, int): raise TypeError("status code must be an integer") return cls(ver=request.ver, code=code, url=request.url, params=request.params, *args, **kwargs) PARAMS = {'code', 'principal', 'msg', 'issue', 'id', 'url', 'ptags', 'auth', 'sso', 'life', 'params', 'kid', 'signature'} def __init__(self, ver, code, url, params, principal=None, msg=None, issue=None, ptags=None, auth=None, sso=None, life=None): if not isinstance(code, int): raise TypeError("code %r should be an integer" % code) if principal is None: principal = '' if msg is None: msg = '' if issue is None: issue = datetime.utcnow() if ptags is None: ptags = [] if auth is None: auth = '' if sso is None: sso = [] # Check for invalid combinations of values if ((code == status.SUCCESS and principal == '') or (code != status.SUCCESS and principal != '')): raise ValueError("principal must only be given if " "authentication was successful") if code == status.SUCCESS and auth == '' and len(sso) == 0: raise ValueError("sso must be given if auth is not given") self.ver = ver self.code = code self.principal = principal self.msg = msg self.issue = issue self.id = random.randint(100000, 999999) self.url = url self.ptags = ptags self.auth = auth self.sso = sso self.life = life self.params = params self.kid = None self.signature = None @property def as_dict(self): return {k: getattr(self, k) for k in PARAMS} @property def signature_b64(self): if self.signature is None: return None return base64.b64encode(self.signature).decode()\ .replace('+', '-').replace('/', '.').replace('=', '_') @property def requires_signature(self): return self.code == status.SUCCESS @property def is_signed(self): return self.signature is not None @property def message_to_sign(self): parts = [self.ver, self.code, self.msg, datetime_to_protocol(self.issue), self.id, self.url, self.principal] + \ ([','.join(self.ptags)] if self.ver == 3 else []) + \ [self.auth, ','.join(self.sso), self.life, self.params] return '!'.join(map(encode_response_part, parts)) @property def response_string(self): if self.requires_signature and not self.is_signed: raise SignatureNeeded("response code is %d" % self.code) return '!'.join([ self.message_to_sign, encode_response_part(self.kid), encode_response_part(self.signature_b64), ]) @property def redirect_url(self): scheme, netloc, path, orig_query, _ = urlsplit(self.url) if self.ver == 1: # Ignore existing query string qsl = [] else: # Incorporate WLS-Response into existing query string qsl = parse_qsl(orig_query) qsl.append(('WLS-Response', self.response_string)) query = urlencode(qsl) return urlunsplit((scheme, netloc, path, query, ''))
Class variables
var PARAMS
-
set() -> new empty set object set(iterable) -> new set object
Build an unordered collection of unique elements.
Static methods
def respond_to_request(request, code, *args, **kwargs)
-
Expand source code
@classmethod def respond_to_request(cls, request, code, *args, **kwargs): if not isinstance(request, AuthRequest): raise TypeError("request must be an AuthRequest instance") if not isinstance(code, int): raise TypeError("status code must be an integer") return cls(ver=request.ver, code=code, url=request.url, params=request.params, *args, **kwargs)
Instance variables
var as_dict
-
Expand source code
@property def as_dict(self): return {k: getattr(self, k) for k in PARAMS}
var is_signed
-
Expand source code
@property def is_signed(self): return self.signature is not None
var message_to_sign
-
Expand source code
@property def message_to_sign(self): parts = [self.ver, self.code, self.msg, datetime_to_protocol(self.issue), self.id, self.url, self.principal] + \ ([','.join(self.ptags)] if self.ver == 3 else []) + \ [self.auth, ','.join(self.sso), self.life, self.params] return '!'.join(map(encode_response_part, parts))
var redirect_url
-
Expand source code
@property def redirect_url(self): scheme, netloc, path, orig_query, _ = urlsplit(self.url) if self.ver == 1: # Ignore existing query string qsl = [] else: # Incorporate WLS-Response into existing query string qsl = parse_qsl(orig_query) qsl.append(('WLS-Response', self.response_string)) query = urlencode(qsl) return urlunsplit((scheme, netloc, path, query, ''))
var requires_signature
-
Expand source code
@property def requires_signature(self): return self.code == status.SUCCESS
var response_string
-
Expand source code
@property def response_string(self): if self.requires_signature and not self.is_signed: raise SignatureNeeded("response code is %d" % self.code) return '!'.join([ self.message_to_sign, encode_response_part(self.kid), encode_response_part(self.signature_b64), ])
var signature_b64
-
Expand source code
@property def signature_b64(self): if self.signature is None: return None return base64.b64encode(self.signature).decode()\ .replace('+', '-').replace('/', '.').replace('=', '_')