mirror of https://github.com/mitsuhiko/flask.git
David Lord
8 years ago
committed by
GitHub
7 changed files with 399 additions and 112 deletions
@ -1,18 +1,9 @@
|
||||
# -*- coding: utf-8 -*- |
||||
""" |
||||
flask.json |
||||
~~~~~~~~~~ |
||||
|
||||
Implementation helpers for the JSON support in Flask. |
||||
|
||||
:copyright: (c) 2015 by Armin Ronacher. |
||||
:license: BSD, see LICENSE for more details. |
||||
""" |
||||
import io |
||||
import uuid |
||||
from datetime import date |
||||
from .globals import current_app, request |
||||
from ._compat import text_type, PY2 |
||||
from flask.globals import current_app, request |
||||
from flask._compat import text_type, PY2 |
||||
|
||||
from werkzeug.http import http_date |
||||
from jinja2 import Markup |
@ -0,0 +1,297 @@
|
||||
""" |
||||
Tagged JSON |
||||
~~~~~~~~~~~ |
||||
|
||||
A compact representation for lossless serialization of non-standard JSON types. |
||||
:class:`~flask.sessions.SecureCookieSessionInterface` uses this to serialize |
||||
the session data, but it may be useful in other places. It can be extended to |
||||
support other types. |
||||
|
||||
.. autoclass:: TaggedJSONSerializer |
||||
:members: |
||||
|
||||
.. autoclass:: JSONTag |
||||
:members: |
||||
|
||||
Let's seen an example that adds support for :class:`~collections.OrderedDict`. |
||||
Dicts don't have an order in Python or JSON, so to handle this we will dump |
||||
the items as a list of ``[key, value]`` pairs. Subclass :class:`JSONTag` and |
||||
give it the new key ``' od'`` to identify the type. The session serializer |
||||
processes dicts first, so insert the new tag at the front of the order since |
||||
``OrderedDict`` must be processed before ``dict``. :: |
||||
|
||||
from flask.json.tag import JSONTag |
||||
|
||||
class TagOrderedDict(JSONTag): |
||||
__slots__ = ('serializer',) |
||||
key = ' od' |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, OrderedDict) |
||||
|
||||
def to_json(self, value): |
||||
return [[k, self.serializer.tag(v)] for k, v in iteritems(value)] |
||||
|
||||
def to_python(self, value): |
||||
return OrderedDict(value) |
||||
|
||||
app.session_interface.serializer.register(TagOrderedDict, 0) |
||||
|
||||
""" |
||||
|
||||
from base64 import b64decode, b64encode |
||||
from datetime import datetime |
||||
from uuid import UUID |
||||
|
||||
from jinja2 import Markup |
||||
from werkzeug.http import http_date, parse_date |
||||
|
||||
from flask._compat import iteritems, text_type |
||||
from flask.json import dumps, loads |
||||
|
||||
|
||||
class JSONTag(object): |
||||
"""Base class for defining type tags for :class:`TaggedJSONSerializer`.""" |
||||
|
||||
__slots__ = ('serializer',) |
||||
|
||||
#: The tag to mark the serialized object with. If ``None``, this tag is |
||||
#: only used as an intermediate step during tagging. |
||||
key = None |
||||
|
||||
def __init__(self, serializer): |
||||
"""Create a tagger for the given serializer.""" |
||||
self.serializer = serializer |
||||
|
||||
def check(self, value): |
||||
"""Check if the given value should be tagged by this tag.""" |
||||
raise NotImplementedError |
||||
|
||||
def to_json(self, value): |
||||
"""Convert the Python object to an object that is a valid JSON type. |
||||
The tag will be added later.""" |
||||
raise NotImplementedError |
||||
|
||||
def to_python(self, value): |
||||
"""Convert the JSON representation back to the correct type. The tag |
||||
will already be removed.""" |
||||
raise NotImplementedError |
||||
|
||||
def tag(self, value): |
||||
"""Convert the value to a valid JSON type and add the tag structure |
||||
around it.""" |
||||
return {self.key: self.to_json(value)} |
||||
|
||||
|
||||
class TagDict(JSONTag): |
||||
"""Tag for 1-item dicts whose only key matches a registered tag. |
||||
|
||||
Internally, the dict key is suffixed with `__`, and the suffix is removed |
||||
when deserializing. |
||||
""" |
||||
|
||||
__slots__ = () |
||||
key = ' di' |
||||
|
||||
def check(self, value): |
||||
return ( |
||||
isinstance(value, dict) |
||||
and len(value) == 1 |
||||
and next(iter(value)) in self.serializer.tags |
||||
) |
||||
|
||||
def to_json(self, value): |
||||
key = next(iter(value)) |
||||
return {key + '__': self.serializer.tag(value[key])} |
||||
|
||||
def to_python(self, value): |
||||
key = next(iter(value)) |
||||
return {key[:-2]: value[key]} |
||||
|
||||
|
||||
class PassDict(JSONTag): |
||||
__slots__ = () |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, dict) |
||||
|
||||
def to_json(self, value): |
||||
# JSON objects may only have string keys, so don't bother tagging the |
||||
# key here. |
||||
return dict((k, self.serializer.tag(v)) for k, v in iteritems(value)) |
||||
|
||||
tag = to_json |
||||
|
||||
|
||||
class TagTuple(JSONTag): |
||||
__slots__ = () |
||||
key = ' t' |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, tuple) |
||||
|
||||
def to_json(self, value): |
||||
return [self.serializer.tag(item) for item in value] |
||||
|
||||
def to_python(self, value): |
||||
return tuple(value) |
||||
|
||||
|
||||
class PassList(JSONTag): |
||||
__slots__ = () |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, list) |
||||
|
||||
def to_json(self, value): |
||||
return [self.serializer.tag(item) for item in value] |
||||
|
||||
tag = to_json |
||||
|
||||
|
||||
class TagBytes(JSONTag): |
||||
__slots__ = () |
||||
key = ' b' |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, bytes) |
||||
|
||||
def to_json(self, value): |
||||
return b64encode(value).decode('ascii') |
||||
|
||||
def to_python(self, value): |
||||
return b64decode(value) |
||||
|
||||
|
||||
class TagMarkup(JSONTag): |
||||
"""Serialize anything matching the :class:`~flask.Markup` API by |
||||
having a ``__html__`` method to the result of that method. Always |
||||
deserializes to an instance of :class:`~flask.Markup`.""" |
||||
|
||||
__slots__ = () |
||||
key = ' m' |
||||
|
||||
def check(self, value): |
||||
return callable(getattr(value, '__html__', None)) |
||||
|
||||
def to_json(self, value): |
||||
return text_type(value.__html__()) |
||||
|
||||
def to_python(self, value): |
||||
return Markup(value) |
||||
|
||||
|
||||
class TagUUID(JSONTag): |
||||
__slots__ = () |
||||
key = ' u' |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, UUID) |
||||
|
||||
def to_json(self, value): |
||||
return value.hex |
||||
|
||||
def to_python(self, value): |
||||
return UUID(value) |
||||
|
||||
|
||||
class TagDateTime(JSONTag): |
||||
__slots__ = () |
||||
key = ' d' |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, datetime) |
||||
|
||||
def to_json(self, value): |
||||
return http_date(value) |
||||
|
||||
def to_python(self, value): |
||||
return parse_date(value) |
||||
|
||||
|
||||
class TaggedJSONSerializer(object): |
||||
"""Serializer that uses a tag system to compactly represent objects that |
||||
are not JSON types. Passed as the intermediate serializer to |
||||
:class:`itsdangerous.Serializer`. |
||||
|
||||
The following extra types are supported: |
||||
|
||||
* :class:`dict` |
||||
* :class:`tuple` |
||||
* :class:`bytes` |
||||
* :class:`~flask.Markup` |
||||
* :class:`~uuid.UUID` |
||||
* :class:`~datetime.datetime` |
||||
""" |
||||
|
||||
__slots__ = ('tags', 'order') |
||||
|
||||
#: Tag classes to bind when creating the serializer. Other tags can be |
||||
#: added later using :meth:`~register`. |
||||
default_tags = [ |
||||
TagDict, PassDict, TagTuple, PassList, TagBytes, TagMarkup, TagUUID, |
||||
TagDateTime, |
||||
] |
||||
|
||||
def __init__(self): |
||||
self.tags = {} |
||||
self.order = [] |
||||
|
||||
for cls in self.default_tags: |
||||
self.register(cls) |
||||
|
||||
def register(self, tag_class, force=False, index=-1): |
||||
"""Register a new tag with this serializer. |
||||
|
||||
:param tag_class: tag class to register. Will be instantiated with this |
||||
serializer instance. |
||||
:param force: overwrite an existing tag. If false (default), a |
||||
:exc:`KeyError` is raised. |
||||
:param index: index to insert the new tag in the tag order. Useful when |
||||
the new tag is a special case of an existing tag. If -1 (default), |
||||
the tag is appended to the end of the order. |
||||
|
||||
:raise KeyError: if the tag key is already registered and ``force`` is |
||||
not true. |
||||
""" |
||||
tag = tag_class(self) |
||||
key = tag.key |
||||
|
||||
if key is not None: |
||||
if not force and key in self.tags: |
||||
raise KeyError("Tag '{0}' is already registered.".format(key)) |
||||
|
||||
self.tags[key] = tag |
||||
|
||||
if index == -1: |
||||
self.order.append(tag) |
||||
else: |
||||
self.order.insert(index, tag) |
||||
|
||||
def tag(self, value): |
||||
"""Convert a value to a tagged representation if necessary.""" |
||||
for tag in self.order: |
||||
if tag.check(value): |
||||
return tag.tag(value) |
||||
|
||||
return value |
||||
|
||||
def untag(self, value): |
||||
"""Convert a tagged representation back to the original type.""" |
||||
if len(value) != 1: |
||||
return value |
||||
|
||||
key = next(iter(value)) |
||||
|
||||
if key not in self.tags: |
||||
return value |
||||
|
||||
return self.tags[key].to_python(value[key]) |
||||
|
||||
def dumps(self, value): |
||||
"""Tag the value and dump it to a compact JSON string.""" |
||||
return dumps(self.tag(value), separators=(',', ':')) |
||||
|
||||
def loads(self, value): |
||||
"""Load data from a JSON string and deserialized any tagged objects.""" |
||||
return loads(value, object_hook=self.untag) |
@ -0,0 +1,65 @@
|
||||
from datetime import datetime |
||||
from uuid import uuid4 |
||||
|
||||
import pytest |
||||
|
||||
from flask import Markup |
||||
from flask.json.tag import TaggedJSONSerializer, JSONTag |
||||
|
||||
|
||||
@pytest.mark.parametrize("data", ( |
||||
{' t': (1, 2, 3)}, |
||||
{' t__': b'a'}, |
||||
{' di': ' di'}, |
||||
{'x': (1, 2, 3), 'y': 4}, |
||||
(1, 2, 3), |
||||
[(1, 2, 3)], |
||||
b'\xff', |
||||
Markup('<html>'), |
||||
uuid4(), |
||||
datetime.utcnow().replace(microsecond=0), |
||||
)) |
||||
def test_dump_load_unchanged(data): |
||||
s = TaggedJSONSerializer() |
||||
assert s.loads(s.dumps(data)) == data |
||||
|
||||
|
||||
def test_duplicate_tag(): |
||||
class TagDict(JSONTag): |
||||
key = ' d' |
||||
|
||||
s = TaggedJSONSerializer() |
||||
pytest.raises(KeyError, s.register, TagDict) |
||||
s.register(TagDict, force=True, index=0) |
||||
assert isinstance(s.tags[' d'], TagDict) |
||||
assert isinstance(s.order[0], TagDict) |
||||
|
||||
|
||||
def test_custom_tag(): |
||||
class Foo(object): |
||||
def __init__(self, data): |
||||
self.data = data |
||||
|
||||
class TagFoo(JSONTag): |
||||
__slots__ = () |
||||
key = ' f' |
||||
|
||||
def check(self, value): |
||||
return isinstance(value, Foo) |
||||
|
||||
def to_json(self, value): |
||||
return self.serializer.tag(value.data) |
||||
|
||||
def to_python(self, value): |
||||
return Foo(value) |
||||
|
||||
s = TaggedJSONSerializer() |
||||
s.register(TagFoo) |
||||
assert s.loads(s.dumps(Foo('bar'))).data == 'bar' |
||||
|
||||
|
||||
def test_tag_interface(): |
||||
t = JSONTag(None) |
||||
pytest.raises(NotImplementedError, t.check, None) |
||||
pytest.raises(NotImplementedError, t.to_json, None) |
||||
pytest.raises(NotImplementedError, t.to_python, None) |
Loading…
Reference in new issue