"""
Request
=======
When a page is requested, automatically created a :class:`Request` object that
contains metadata about the request.
Since this object is global within the thread,
you can freely import from anywhere and retrieve request information.
"""
import base64
import fnmatch
import hashlib
import hmac
import threading
import cgi
import json
import pickle
from urllib.parse import SplitResult
from http.cookies import SimpleCookie
##################################################################################
# Request Object #################################################################
##################################################################################
[docs]class Request:
""" A wrapper for WSGI environment dictionaries.
"""
__slots__ = ('environ', '_body', '_forms')
def __init__(self, environ=None):
self.environ = {} if environ is None else environ
self.environ['kobin.request'] = self
self._body = None
self._forms = None
def get(self, value, default=None):
return self.environ.get(value, default)
@property
def path(self):
""" The value of ``PATH_INFO`` with exactly one prefixed slash (to fix
broken clients and avoid the "empty path" edge case). """
return '/' + self.environ.get('PATH_INFO', '').lstrip('/')
@property
def method(self):
""" The ``REQUEST_METHOD`` value as an uppercase string. """
return self.environ.get('REQUEST_METHOD', 'GET').upper()
@property
def headers(self):
return {k[len('HTTP_'):]: v
for k, v in self.environ.items()
if k.startswith('HTTP_')}
@property
def query(self):
params = cgi.FieldStorage(
environ=self.environ,
keep_blank_values=True,
)
p = {k: params[k].value for k in params}
return p
@property
def forms(self):
if self._forms is None:
form = cgi.FieldStorage(
fp=self.environ['wsgi.input'],
environ=self.environ,
keep_blank_values=True,
)
self._forms = {k: form[k].value for k in form}
return self._forms
@property
def raw_body(self):
if self._body is None:
self._body = self.environ['wsgi.input'].read(
int(self.environ.get('CONTENT_LENGTH', 0)))
return self._body
@property
def body(self):
return self.raw_body.decode('utf-8')
@property
def json(self):
return json.loads(self.body)
@property
def url(self):
protocol = self.get('HTTP_X_FORWARDED_PROTO') or self.get('wsgi.url_scheme', 'http')
host = self.get('HTTP_X_FORWARDED_HOST') or self.get('HTTP_HOST')
query_params = self.get("QUERY_STRING")
url_split_result = SplitResult(protocol, host, self.path, query_params, '')
return url_split_result.geturl()
@property
def cookies(self):
cookies = SimpleCookie(self.environ.get('HTTP_COOKIE', '')).values()
return {c.key: c.value for c in cookies}
def get_cookie(self, key, default=None, secret=None, digestmod=hashlib.sha256):
from kobin.app import current_config
if secret is None:
secret = current_config('SECRET_KEY')
value = self.cookies.get(key)
if secret and value and value.startswith('!') and '?' in value:
# See BaseResponse.set_cookie for details.
if isinstance(secret, str):
secret = secret.encode('utf-8')
sig, msg = map(lambda x: x.encode('utf-8'), value[1:].split('?', 1))
hash_string = hmac.new(secret, msg, digestmod=digestmod).digest()
if sig == base64.b64encode(hash_string):
key_and_value = pickle.loads(base64.b64decode(msg))
if key_and_value and key_and_value[0] == key:
return key_and_value[1]
return value or default
def __getitem__(self, key):
return self.environ[key]
def __delitem__(self, key):
self[key] = ""
del (self.environ[key])
def __setitem__(self, key, value):
""" Change an environ value and clear all caches that depend on it. """
self.environ[key] = value
todelete = ()
if key == 'wsgi.input':
todelete = ('body', 'forms', 'files', 'params', 'post', 'json')
elif key == 'QUERY_STRING':
todelete = ('query', 'params')
elif key.startswith('HTTP_'):
todelete = ('headers', 'cookies')
for key in todelete:
self.environ.pop('kobin.request.' + key, None)
def __len__(self):
return len(self.environ)
def __repr__(self):
return '<{cls}: {method} {url}>'.format(
cls=self.__class__.__name__, method=self.method, url=self.path
)
# for Accept header.
def _split_into_mimetype_and_priority(x):
"""Split an accept header item into mimetype and priority.
>>> _split_into_mimetype_and_priority('text/*')
('text/*', 1.0)
>>> _split_into_mimetype_and_priority('application/json;q=0.5')
('application/json', 0.5)
"""
if ';' in x:
content_type, priority = x.split(';')
casted_priority = float(priority.split('=')[1])
else:
content_type, casted_priority = x, 1.0
content_type = content_type.lstrip().rstrip() # Replace ' text/html' to 'text/html'
return content_type, casted_priority
def _parse_and_sort_accept_header(accept_header):
"""Parse and sort the accept header items.
>>> _parse_and_sort_accept_header('application/json;q=0.5, text/*')
[('text/*', 1.0), ('application/json', 0.5)]
"""
return sorted([_split_into_mimetype_and_priority(x) for x in accept_header.split(',')],
key=lambda x: x[1], reverse=True)
[docs]def accept_best_match(accept_header, mimetypes):
"""Return a mimetype best matched the accept headers.
>>> accept_best_match('application/json, text/html', ['application/json', 'text/plain'])
'application/json'
>>> accept_best_match('application/json;q=0.5, text/*', ['application/json', 'text/plain'])
'text/plain'
"""
for mimetype_pattern, _ in _parse_and_sort_accept_header(accept_header):
matched_types = fnmatch.filter(mimetypes, mimetype_pattern)
if matched_types:
return matched_types[0]
return mimetypes[0]
def _local_property():
ls = threading.local()
def fget(_):
try:
return ls.var
except AttributeError:
raise RuntimeError("Request context not initialized.")
def fset(_, value):
ls.var = value
def fdel(_):
del ls.var
return property(fget, fset, fdel, 'Thread-local property')
[docs]class LocalRequest(Request):
bind = Request.__init__
environ = _local_property()
_body = _local_property()
_forms = _local_property()
request = LocalRequest()