Browse Source

Move object_hook outside loads method so class can be extend and reused

pull/1452/head
Josh Rowe 8 years ago
parent
commit
8a8a608152
  1. 147
      flask/sessions.py
  2. 6
      tests/test_basic.py

147
flask/sessions.py

@ -8,7 +8,6 @@
:copyright: (c) 2015 by Armin Ronacher. :copyright: (c) 2015 by Armin Ronacher.
:license: BSD, see LICENSE for more details. :license: BSD, see LICENSE for more details.
""" """
import uuid import uuid
import hashlib import hashlib
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
@ -49,22 +48,82 @@ class SessionMixin(object):
modified = True modified = True
def _tag(value): class TaggedJSONSerializer(object):
if isinstance(value, tuple): """A customized JSON serializer that supports a few extra types that
return {' t': [_tag(x) for x in value]} we take for granted when serializing (tuples, markup objects, datetime).
elif isinstance(value, uuid.UUID): """
return {' u': value.hex}
elif isinstance(value, bytes): def __init__(self):
return {' b': b64encode(value).decode('ascii')} self.conversions = [
elif callable(getattr(value, '__html__', None)): {
return {' m': text_type(value.__html__())} 'check': lambda value: self._is_dict_with_used_key(value),
elif isinstance(value, list): 'tag': lambda value: self._tag_dict_used_with_key(value),
return [_tag(x) for x in value] 'untag': lambda value: self._untag_dict_used_with_key(value),
elif isinstance(value, datetime): 'key': ' di',
return {' d': http_date(value)} },
elif isinstance(value, dict): {
return dict((k, _tag(v)) for k, v in iteritems(value)) 'check': lambda value: isinstance(value, tuple),
elif isinstance(value, str): 'tag': lambda value: [self._tag(x) for x in value],
'untag': lambda value: tuple(value),
'key': ' t',
},
{
'check': lambda value: isinstance(value, uuid.UUID),
'tag': lambda value: value.hex,
'untag': lambda value: uuid.UUID(value),
'key': ' u',
},
{
'check': lambda value: isinstance(value, bytes),
'tag': lambda value: b64encode(value).decode('ascii'),
'untag': lambda value: b64decode(value),
'key': ' b',
},
{
'check': lambda value: callable(getattr(value, '__html__',
None)),
'tag': lambda value: text_type(value.__html__()),
'untag': lambda value: Markup(value),
'key': ' m',
},
{
'check': lambda value: isinstance(value, list),
'tag': lambda value: [self._tag(x) for x in value],
},
{
'check': lambda value: isinstance(value, datetime),
'tag': lambda value: http_date(value),
'untag': lambda value: parse_date(value),
'key': ' d',
},
{
'check': lambda value: isinstance(value, dict),
'tag': lambda value: dict((k, self._tag(v)) for k, v in
iteritems(value)),
},
{
'check': lambda value: isinstance(value, str),
'tag': lambda value: self._tag_string(value),
}
]
@property
def keys(self):
return [c['key'] for c in self.conversions if c.get('key')]
def _get_conversion_untag(self, key):
return next(
(c['untag'] for c in self.conversions if c.get('key') == key),
lambda v: None
)
def _is_dict_with_used_key(self, v):
return isinstance(v, dict) and len(v) == 1 and list(v)[0] in self.keys
def _was_dict_with_used_key(self, k):
return k.endswith('__') and k[:-2] in self.keys
def _tag_string(self, value):
try: try:
return text_type(value) return text_type(value)
except UnicodeError: except UnicodeError:
@ -73,38 +132,38 @@ def _tag(value):
u'non-ASCII data was passed to the session system ' u'non-ASCII data was passed to the session system '
u'which can only store unicode strings. Consider ' u'which can only store unicode strings. Consider '
u'base64 encoding your string (String was %r)' % value) u'base64 encoding your string (String was %r)' % value)
return value
class TaggedJSONSerializer(object): def _tag_dict_used_with_key(self, value):
"""A customized JSON serializer that supports a few extra types that k, v = next(iteritems(value))
we take for granted when serializing (tuples, markup objects, datetime). return {'%s__' % k: v}
"""
def _tag(self, value):
for tag_ops in self.conversions:
if tag_ops['check'](value):
tag = tag_ops.get('key')
if tag:
return {tag: tag_ops['tag'](value)}
return tag_ops['tag'](value)
return value
def _untag_dict_used_with_key(self, the_value):
k, v = next(iteritems(the_value))
if self._was_dict_with_used_key(k):
return {k[:-2]: self._untag(v)}
def _untag(self, obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
untag = self._get_conversion_untag(the_key)
new_value = untag(the_value)
return new_value if new_value else obj
def dumps(self, value): def dumps(self, value):
return json.dumps(_tag(value), separators=(',', ':')) return json.dumps(self._tag(value), separators=(',', ':'))
LOADS_MAP = {
' t': tuple,
' u': uuid.UUID,
' b': b64decode,
' m': Markup,
' d': parse_date,
}
def loads(self, value): def loads(self, value):
def object_hook(obj): return json.loads(value, object_hook=self._untag)
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
# Check the key for a corresponding function
return_function = self.LOADS_MAP.get(the_key)
if return_function:
# Pass the value to the function
return return_function(the_value)
# Didn't find a function for this object
return obj
return json.loads(value, object_hook=object_hook)
session_json_serializer = TaggedJSONSerializer() session_json_serializer = TaggedJSONSerializer()

6
tests/test_basic.py

@ -383,6 +383,9 @@ def test_session_special_types():
flask.session['dt'] = now flask.session['dt'] = now
flask.session['b'] = b'\xff' flask.session['b'] = b'\xff'
flask.session['t'] = (1, 2, 3) flask.session['t'] = (1, 2, 3)
flask.session['spacefirst'] = {' t': 'not-a-tuple'}
flask.session['withunderscores'] = {' t__': 'not-a-tuple'}
flask.session['notadict'] = {' di': 'not-a-dict'}
return response return response
@app.route('/') @app.route('/')
@ -399,6 +402,9 @@ def test_session_special_types():
assert rv['b'] == b'\xff' assert rv['b'] == b'\xff'
assert type(rv['b']) == bytes assert type(rv['b']) == bytes
assert rv['t'] == (1, 2, 3) assert rv['t'] == (1, 2, 3)
assert rv['spacefirst'] == {' t': 'not-a-tuple'}
assert rv['withunderscores'] == {' t__': 'not-a-tuple'}
assert rv['notadict'] == {' di': 'not-a-dict'}
def test_session_cookie_setting(): def test_session_cookie_setting():

Loading…
Cancel
Save