From 8a8a60815294eb9d5db0c2d451c6c9d959c7c499 Mon Sep 17 00:00:00 2001 From: Josh Rowe Date: Thu, 23 Feb 2017 15:23:44 +0000 Subject: [PATCH] Move object_hook outside loads method so class can be extend and reused --- flask/sessions.py | 147 +++++++++++++++++++++++++++++++------------- tests/test_basic.py | 6 ++ 2 files changed, 109 insertions(+), 44 deletions(-) diff --git a/flask/sessions.py b/flask/sessions.py index 525ff246..4335eeaa 100644 --- a/flask/sessions.py +++ b/flask/sessions.py @@ -8,7 +8,6 @@ :copyright: (c) 2015 by Armin Ronacher. :license: BSD, see LICENSE for more details. """ - import uuid import hashlib from base64 import b64encode, b64decode @@ -49,22 +48,82 @@ class SessionMixin(object): modified = True -def _tag(value): - if isinstance(value, tuple): - return {' t': [_tag(x) for x in value]} - elif isinstance(value, uuid.UUID): - return {' u': value.hex} - elif isinstance(value, bytes): - return {' b': b64encode(value).decode('ascii')} - elif callable(getattr(value, '__html__', None)): - return {' m': text_type(value.__html__())} - elif isinstance(value, list): - return [_tag(x) for x in value] - elif isinstance(value, datetime): - return {' d': http_date(value)} - elif isinstance(value, dict): - return dict((k, _tag(v)) for k, v in iteritems(value)) - elif isinstance(value, str): +class TaggedJSONSerializer(object): + """A customized JSON serializer that supports a few extra types that + we take for granted when serializing (tuples, markup objects, datetime). + """ + + def __init__(self): + self.conversions = [ + { + 'check': lambda value: self._is_dict_with_used_key(value), + 'tag': lambda value: self._tag_dict_used_with_key(value), + 'untag': lambda value: self._untag_dict_used_with_key(value), + 'key': ' di', + }, + { + 'check': lambda value: isinstance(value, tuple), + 'tag': lambda value: [self._tag(x) for x in value], + 'untag': lambda value: tuple(value), + 'key': ' t', + }, + { + 'check': lambda value: isinstance(value, uuid.UUID), + 'tag': lambda value: value.hex, + 'untag': lambda value: uuid.UUID(value), + 'key': ' u', + }, + { + 'check': lambda value: isinstance(value, bytes), + 'tag': lambda value: b64encode(value).decode('ascii'), + 'untag': lambda value: b64decode(value), + 'key': ' b', + }, + { + 'check': lambda value: callable(getattr(value, '__html__', + None)), + 'tag': lambda value: text_type(value.__html__()), + 'untag': lambda value: Markup(value), + 'key': ' m', + }, + { + 'check': lambda value: isinstance(value, list), + 'tag': lambda value: [self._tag(x) for x in value], + }, + { + 'check': lambda value: isinstance(value, datetime), + 'tag': lambda value: http_date(value), + 'untag': lambda value: parse_date(value), + 'key': ' d', + }, + { + 'check': lambda value: isinstance(value, dict), + 'tag': lambda value: dict((k, self._tag(v)) for k, v in + iteritems(value)), + }, + { + 'check': lambda value: isinstance(value, str), + 'tag': lambda value: self._tag_string(value), + } + ] + + @property + def keys(self): + return [c['key'] for c in self.conversions if c.get('key')] + + def _get_conversion_untag(self, key): + return next( + (c['untag'] for c in self.conversions if c.get('key') == key), + lambda v: None + ) + + def _is_dict_with_used_key(self, v): + return isinstance(v, dict) and len(v) == 1 and list(v)[0] in self.keys + + def _was_dict_with_used_key(self, k): + return k.endswith('__') and k[:-2] in self.keys + + def _tag_string(self, value): try: return text_type(value) except UnicodeError: @@ -73,38 +132,38 @@ def _tag(value): u'non-ASCII data was passed to the session system ' u'which can only store unicode strings. Consider ' u'base64 encoding your string (String was %r)' % value) - return value - -class TaggedJSONSerializer(object): - """A customized JSON serializer that supports a few extra types that - we take for granted when serializing (tuples, markup objects, datetime). - """ + def _tag_dict_used_with_key(self, value): + k, v = next(iteritems(value)) + return {'%s__' % k: v} + + def _tag(self, value): + for tag_ops in self.conversions: + if tag_ops['check'](value): + tag = tag_ops.get('key') + if tag: + return {tag: tag_ops['tag'](value)} + return tag_ops['tag'](value) + return value + + def _untag_dict_used_with_key(self, the_value): + k, v = next(iteritems(the_value)) + if self._was_dict_with_used_key(k): + return {k[:-2]: self._untag(v)} + + def _untag(self, obj): + if len(obj) != 1: + return obj + the_key, the_value = next(iteritems(obj)) + untag = self._get_conversion_untag(the_key) + new_value = untag(the_value) + return new_value if new_value else obj def dumps(self, value): - return json.dumps(_tag(value), separators=(',', ':')) - - LOADS_MAP = { - ' t': tuple, - ' u': uuid.UUID, - ' b': b64decode, - ' m': Markup, - ' d': parse_date, - } + return json.dumps(self._tag(value), separators=(',', ':')) def loads(self, value): - def object_hook(obj): - if len(obj) != 1: - return obj - the_key, the_value = next(iteritems(obj)) - # Check the key for a corresponding function - return_function = self.LOADS_MAP.get(the_key) - if return_function: - # Pass the value to the function - return return_function(the_value) - # Didn't find a function for this object - return obj - return json.loads(value, object_hook=object_hook) + return json.loads(value, object_hook=self._untag) session_json_serializer = TaggedJSONSerializer() diff --git a/tests/test_basic.py b/tests/test_basic.py index 6341234b..62a90cf6 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -383,6 +383,9 @@ def test_session_special_types(): flask.session['dt'] = now flask.session['b'] = b'\xff' flask.session['t'] = (1, 2, 3) + flask.session['spacefirst'] = {' t': 'not-a-tuple'} + flask.session['withunderscores'] = {' t__': 'not-a-tuple'} + flask.session['notadict'] = {' di': 'not-a-dict'} return response @app.route('/') @@ -399,6 +402,9 @@ def test_session_special_types(): assert rv['b'] == b'\xff' assert type(rv['b']) == bytes assert rv['t'] == (1, 2, 3) + assert rv['spacefirst'] == {' t': 'not-a-tuple'} + assert rv['withunderscores'] == {' t__': 'not-a-tuple'} + assert rv['notadict'] == {' di': 'not-a-dict'} def test_session_cookie_setting():