mirror of https://github.com/mitsuhiko/flask.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1867 lines
49 KiB
1867 lines
49 KiB
# -*- coding: utf-8 -*- |
|
""" |
|
tests.basic |
|
~~~~~~~~~~~~~~~~~~~~~ |
|
|
|
The basic functionality. |
|
|
|
:copyright: (c) 2015 by Armin Ronacher. |
|
:license: BSD, see LICENSE for more details. |
|
""" |
|
|
|
import re |
|
import time |
|
import uuid |
|
from datetime import datetime |
|
from threading import Thread |
|
|
|
import pytest |
|
import werkzeug.serving |
|
from werkzeug.exceptions import BadRequest, Forbidden, NotFound |
|
from werkzeug.http import parse_date |
|
from werkzeug.routing import BuildError |
|
|
|
import flask |
|
from flask._compat import text_type |
|
|
|
|
|
def test_options_work(app, client): |
|
@app.route('/', methods=['GET', 'POST']) |
|
def index(): |
|
return 'Hello World' |
|
|
|
rv = client.open('/', method='OPTIONS') |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] |
|
assert rv.data == b'' |
|
|
|
|
|
def test_options_on_multiple_rules(app, client): |
|
@app.route('/', methods=['GET', 'POST']) |
|
def index(): |
|
return 'Hello World' |
|
|
|
@app.route('/', methods=['PUT']) |
|
def index_put(): |
|
return 'Aha!' |
|
|
|
rv = client.open('/', method='OPTIONS') |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST', 'PUT'] |
|
|
|
|
|
def test_provide_automatic_options_attr(): |
|
app = flask.Flask(__name__) |
|
|
|
def index(): |
|
return 'Hello World!' |
|
|
|
index.provide_automatic_options = False |
|
app.route('/')(index) |
|
rv = app.test_client().open('/', method='OPTIONS') |
|
assert rv.status_code == 405 |
|
|
|
app = flask.Flask(__name__) |
|
|
|
def index2(): |
|
return 'Hello World!' |
|
|
|
index2.provide_automatic_options = True |
|
app.route('/', methods=['OPTIONS'])(index2) |
|
rv = app.test_client().open('/', method='OPTIONS') |
|
assert sorted(rv.allow) == ['OPTIONS'] |
|
|
|
|
|
def test_provide_automatic_options_kwarg(app, client): |
|
def index(): |
|
return flask.request.method |
|
|
|
def more(): |
|
return flask.request.method |
|
|
|
app.add_url_rule('/', view_func=index, provide_automatic_options=False) |
|
app.add_url_rule( |
|
'/more', view_func=more, methods=['GET', 'POST'], |
|
provide_automatic_options=False |
|
) |
|
assert client.get('/').data == b'GET' |
|
|
|
rv = client.post('/') |
|
assert rv.status_code == 405 |
|
assert sorted(rv.allow) == ['GET', 'HEAD'] |
|
|
|
# Older versions of Werkzeug.test.Client don't have an options method |
|
if hasattr(client, 'options'): |
|
rv = client.options('/') |
|
else: |
|
rv = client.open('/', method='OPTIONS') |
|
|
|
assert rv.status_code == 405 |
|
|
|
rv = client.head('/') |
|
assert rv.status_code == 200 |
|
assert not rv.data # head truncates |
|
assert client.post('/more').data == b'POST' |
|
assert client.get('/more').data == b'GET' |
|
|
|
rv = client.delete('/more') |
|
assert rv.status_code == 405 |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'POST'] |
|
|
|
if hasattr(client, 'options'): |
|
rv = client.options('/more') |
|
else: |
|
rv = client.open('/more', method='OPTIONS') |
|
|
|
assert rv.status_code == 405 |
|
|
|
|
|
def test_request_dispatching(app, client): |
|
@app.route('/') |
|
def index(): |
|
return flask.request.method |
|
|
|
@app.route('/more', methods=['GET', 'POST']) |
|
def more(): |
|
return flask.request.method |
|
|
|
assert client.get('/').data == b'GET' |
|
rv = client.post('/') |
|
assert rv.status_code == 405 |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] |
|
rv = client.head('/') |
|
assert rv.status_code == 200 |
|
assert not rv.data # head truncates |
|
assert client.post('/more').data == b'POST' |
|
assert client.get('/more').data == b'GET' |
|
rv = client.delete('/more') |
|
assert rv.status_code == 405 |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] |
|
|
|
|
|
def test_disallow_string_for_allowed_methods(app): |
|
with pytest.raises(TypeError): |
|
@app.route('/', methods='GET POST') |
|
def index(): |
|
return "Hey" |
|
|
|
|
|
def test_url_mapping(app, client): |
|
random_uuid4 = "7eb41166-9ebf-4d26-b771-ea3f54f8b383" |
|
|
|
def index(): |
|
return flask.request.method |
|
|
|
def more(): |
|
return flask.request.method |
|
|
|
def options(): |
|
return random_uuid4 |
|
|
|
app.add_url_rule('/', 'index', index) |
|
app.add_url_rule('/more', 'more', more, methods=['GET', 'POST']) |
|
|
|
# Issue 1288: Test that automatic options are not added when non-uppercase 'options' in methods |
|
app.add_url_rule('/options', 'options', options, methods=['options']) |
|
|
|
assert client.get('/').data == b'GET' |
|
rv = client.post('/') |
|
assert rv.status_code == 405 |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] |
|
rv = client.head('/') |
|
assert rv.status_code == 200 |
|
assert not rv.data # head truncates |
|
assert client.post('/more').data == b'POST' |
|
assert client.get('/more').data == b'GET' |
|
rv = client.delete('/more') |
|
assert rv.status_code == 405 |
|
assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] |
|
rv = client.open('/options', method='OPTIONS') |
|
assert rv.status_code == 200 |
|
assert random_uuid4 in rv.data.decode("utf-8") |
|
|
|
|
|
def test_werkzeug_routing(app, client): |
|
from werkzeug.routing import Submount, Rule |
|
app.url_map.add(Submount('/foo', [ |
|
Rule('/bar', endpoint='bar'), |
|
Rule('/', endpoint='index') |
|
])) |
|
|
|
def bar(): |
|
return 'bar' |
|
|
|
def index(): |
|
return 'index' |
|
|
|
app.view_functions['bar'] = bar |
|
app.view_functions['index'] = index |
|
|
|
assert client.get('/foo/').data == b'index' |
|
assert client.get('/foo/bar').data == b'bar' |
|
|
|
|
|
def test_endpoint_decorator(app, client): |
|
from werkzeug.routing import Submount, Rule |
|
app.url_map.add(Submount('/foo', [ |
|
Rule('/bar', endpoint='bar'), |
|
Rule('/', endpoint='index') |
|
])) |
|
|
|
@app.endpoint('bar') |
|
def bar(): |
|
return 'bar' |
|
|
|
@app.endpoint('index') |
|
def index(): |
|
return 'index' |
|
|
|
assert client.get('/foo/').data == b'index' |
|
assert client.get('/foo/bar').data == b'bar' |
|
|
|
|
|
def test_session(app, client): |
|
@app.route('/set', methods=['POST']) |
|
def set(): |
|
flask.session['value'] = flask.request.form['value'] |
|
return 'value set' |
|
|
|
@app.route('/get') |
|
def get(): |
|
return flask.session['value'] |
|
|
|
assert client.post('/set', data={'value': '42'}).data == b'value set' |
|
assert client.get('/get').data == b'42' |
|
|
|
|
|
def test_session_using_server_name(app, client): |
|
app.config.update( |
|
SERVER_NAME='example.com' |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'Hello World' |
|
|
|
rv = client.get('/', 'http://example.com/') |
|
assert 'domain=.example.com' in rv.headers['set-cookie'].lower() |
|
assert 'httponly' in rv.headers['set-cookie'].lower() |
|
|
|
|
|
def test_session_using_server_name_and_port(app, client): |
|
app.config.update( |
|
SERVER_NAME='example.com:8080' |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'Hello World' |
|
|
|
rv = client.get('/', 'http://example.com:8080/') |
|
assert 'domain=.example.com' in rv.headers['set-cookie'].lower() |
|
assert 'httponly' in rv.headers['set-cookie'].lower() |
|
|
|
|
|
def test_session_using_server_name_port_and_path(app, client): |
|
app.config.update( |
|
SERVER_NAME='example.com:8080', |
|
APPLICATION_ROOT='/foo' |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'Hello World' |
|
|
|
rv = client.get('/', 'http://example.com:8080/foo') |
|
assert 'domain=example.com' in rv.headers['set-cookie'].lower() |
|
assert 'path=/foo' in rv.headers['set-cookie'].lower() |
|
assert 'httponly' in rv.headers['set-cookie'].lower() |
|
|
|
|
|
def test_session_using_application_root(app, client): |
|
class PrefixPathMiddleware(object): |
|
def __init__(self, app, prefix): |
|
self.app = app |
|
self.prefix = prefix |
|
|
|
def __call__(self, environ, start_response): |
|
environ['SCRIPT_NAME'] = self.prefix |
|
return self.app(environ, start_response) |
|
|
|
app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar') |
|
app.config.update( |
|
APPLICATION_ROOT='/bar' |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'Hello World' |
|
|
|
rv = client.get('/', 'http://example.com:8080/') |
|
assert 'path=/bar' in rv.headers['set-cookie'].lower() |
|
|
|
|
|
def test_session_using_session_settings(app, client): |
|
app.config.update( |
|
SERVER_NAME='www.example.com:8080', |
|
APPLICATION_ROOT='/test', |
|
SESSION_COOKIE_DOMAIN='.example.com', |
|
SESSION_COOKIE_HTTPONLY=False, |
|
SESSION_COOKIE_SECURE=True, |
|
SESSION_COOKIE_PATH='/' |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'Hello World' |
|
|
|
rv = client.get('/', 'http://www.example.com:8080/test/') |
|
cookie = rv.headers['set-cookie'].lower() |
|
assert 'domain=.example.com' in cookie |
|
assert 'path=/' in cookie |
|
assert 'secure' in cookie |
|
assert 'httponly' not in cookie |
|
|
|
|
|
def test_session_localhost_warning(recwarn, app, client): |
|
app.config.update( |
|
SERVER_NAME='localhost:5000', |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'testing' |
|
|
|
rv = client.get('/', 'http://localhost:5000/') |
|
assert 'domain' not in rv.headers['set-cookie'].lower() |
|
w = recwarn.pop(UserWarning) |
|
assert '"localhost" is not a valid cookie domain' in str(w.message) |
|
|
|
|
|
def test_session_ip_warning(recwarn, app, client): |
|
app.config.update( |
|
SERVER_NAME='127.0.0.1:5000', |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['testing'] = 42 |
|
return 'testing' |
|
|
|
rv = client.get('/', 'http://127.0.0.1:5000/') |
|
assert 'domain=127.0.0.1' in rv.headers['set-cookie'].lower() |
|
w = recwarn.pop(UserWarning) |
|
assert 'cookie domain is an IP' in str(w.message) |
|
|
|
|
|
def test_missing_session(app): |
|
app.secret_key = None |
|
|
|
def expect_exception(f, *args, **kwargs): |
|
e = pytest.raises(RuntimeError, f, *args, **kwargs) |
|
assert e.value.args and 'session is unavailable' in e.value.args[0] |
|
|
|
with app.test_request_context(): |
|
assert flask.session.get('missing_key') is None |
|
expect_exception(flask.session.__setitem__, 'foo', 42) |
|
expect_exception(flask.session.pop, 'foo') |
|
|
|
|
|
def test_session_expiration(app, client): |
|
permanent = True |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.session['test'] = 42 |
|
flask.session.permanent = permanent |
|
return '' |
|
|
|
@app.route('/test') |
|
def test(): |
|
return text_type(flask.session.permanent) |
|
|
|
rv = client.get('/') |
|
assert 'set-cookie' in rv.headers |
|
match = re.search(r'(?i)\bexpires=([^;]+)', rv.headers['set-cookie']) |
|
expires = parse_date(match.group()) |
|
expected = datetime.utcnow() + app.permanent_session_lifetime |
|
assert expires.year == expected.year |
|
assert expires.month == expected.month |
|
assert expires.day == expected.day |
|
|
|
rv = client.get('/test') |
|
assert rv.data == b'True' |
|
|
|
permanent = False |
|
rv = client.get('/') |
|
assert 'set-cookie' in rv.headers |
|
match = re.search(r'\bexpires=([^;]+)', rv.headers['set-cookie']) |
|
assert match is None |
|
|
|
|
|
def test_session_stored_last(app, client): |
|
@app.after_request |
|
def modify_session(response): |
|
flask.session['foo'] = 42 |
|
return response |
|
|
|
@app.route('/') |
|
def dump_session_contents(): |
|
return repr(flask.session.get('foo')) |
|
|
|
assert client.get('/').data == b'None' |
|
assert client.get('/').data == b'42' |
|
|
|
|
|
def test_session_special_types(app, client): |
|
now = datetime.utcnow().replace(microsecond=0) |
|
the_uuid = uuid.uuid4() |
|
|
|
@app.route('/') |
|
def dump_session_contents(): |
|
flask.session['t'] = (1, 2, 3) |
|
flask.session['b'] = b'\xff' |
|
flask.session['m'] = flask.Markup('<html>') |
|
flask.session['u'] = the_uuid |
|
flask.session['d'] = now |
|
flask.session['t_tag'] = {' t': 'not-a-tuple'} |
|
flask.session['di_t_tag'] = {' t__': 'not-a-tuple'} |
|
flask.session['di_tag'] = {' di': 'not-a-dict'} |
|
return '', 204 |
|
|
|
with client: |
|
client.get('/') |
|
s = flask.session |
|
assert s['t'] == (1, 2, 3) |
|
assert type(s['b']) == bytes |
|
assert s['b'] == b'\xff' |
|
assert type(s['m']) == flask.Markup |
|
assert s['m'] == flask.Markup('<html>') |
|
assert s['u'] == the_uuid |
|
assert s['d'] == now |
|
assert s['t_tag'] == {' t': 'not-a-tuple'} |
|
assert s['di_t_tag'] == {' t__': 'not-a-tuple'} |
|
assert s['di_tag'] == {' di': 'not-a-dict'} |
|
|
|
|
|
def test_session_cookie_setting(app): |
|
is_permanent = True |
|
|
|
@app.route('/bump') |
|
def bump(): |
|
rv = flask.session['foo'] = flask.session.get('foo', 0) + 1 |
|
flask.session.permanent = is_permanent |
|
return str(rv) |
|
|
|
@app.route('/read') |
|
def read(): |
|
return str(flask.session.get('foo', 0)) |
|
|
|
def run_test(expect_header): |
|
with app.test_client() as c: |
|
assert c.get('/bump').data == b'1' |
|
assert c.get('/bump').data == b'2' |
|
assert c.get('/bump').data == b'3' |
|
|
|
rv = c.get('/read') |
|
set_cookie = rv.headers.get('set-cookie') |
|
assert (set_cookie is not None) == expect_header |
|
assert rv.data == b'3' |
|
|
|
is_permanent = True |
|
app.config['SESSION_REFRESH_EACH_REQUEST'] = True |
|
run_test(expect_header=True) |
|
|
|
is_permanent = True |
|
app.config['SESSION_REFRESH_EACH_REQUEST'] = False |
|
run_test(expect_header=False) |
|
|
|
is_permanent = False |
|
app.config['SESSION_REFRESH_EACH_REQUEST'] = True |
|
run_test(expect_header=False) |
|
|
|
is_permanent = False |
|
app.config['SESSION_REFRESH_EACH_REQUEST'] = False |
|
run_test(expect_header=False) |
|
|
|
|
|
def test_session_vary_cookie(app, client): |
|
@app.route('/set') |
|
def set_session(): |
|
flask.session['test'] = 'test' |
|
return '' |
|
|
|
@app.route('/get') |
|
def get(): |
|
return flask.session.get('test') |
|
|
|
@app.route('/getitem') |
|
def getitem(): |
|
return flask.session['test'] |
|
|
|
@app.route('/setdefault') |
|
def setdefault(): |
|
return flask.session.setdefault('test', 'default') |
|
|
|
@app.route('/vary-cookie-header-set') |
|
def vary_cookie_header_set(): |
|
response = flask.Response() |
|
response.vary.add('Cookie') |
|
flask.session['test'] = 'test' |
|
return response |
|
|
|
@app.route('/vary-header-set') |
|
def vary_header_set(): |
|
response = flask.Response() |
|
response.vary.update(('Accept-Encoding', 'Accept-Language')) |
|
flask.session['test'] = 'test' |
|
return response |
|
|
|
@app.route('/no-vary-header') |
|
def no_vary_header(): |
|
return '' |
|
|
|
def expect(path, header_value='Cookie'): |
|
rv = client.get(path) |
|
|
|
if header_value: |
|
# The 'Vary' key should exist in the headers only once. |
|
assert len(rv.headers.get_all('Vary')) == 1 |
|
assert rv.headers['Vary'] == header_value |
|
else: |
|
assert 'Vary' not in rv.headers |
|
|
|
expect('/set') |
|
expect('/get') |
|
expect('/getitem') |
|
expect('/setdefault') |
|
expect('/vary-cookie-header-set') |
|
expect('/vary-header-set', 'Accept-Encoding, Accept-Language, Cookie') |
|
expect('/no-vary-header', None) |
|
|
|
|
|
def test_flashes(app, req_ctx): |
|
assert not flask.session.modified |
|
flask.flash('Zap') |
|
flask.session.modified = False |
|
flask.flash('Zip') |
|
assert flask.session.modified |
|
assert list(flask.get_flashed_messages()) == ['Zap', 'Zip'] |
|
|
|
|
|
def test_extended_flashing(app): |
|
# Be sure app.testing=True below, else tests can fail silently. |
|
# |
|
# Specifically, if app.testing is not set to True, the AssertionErrors |
|
# in the view functions will cause a 500 response to the test client |
|
# instead of propagating exceptions. |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.flash(u'Hello World') |
|
flask.flash(u'Hello World', 'error') |
|
flask.flash(flask.Markup(u'<em>Testing</em>'), 'warning') |
|
return '' |
|
|
|
@app.route('/test/') |
|
def test(): |
|
messages = flask.get_flashed_messages() |
|
assert list(messages) == [ |
|
u'Hello World', |
|
u'Hello World', |
|
flask.Markup(u'<em>Testing</em>') |
|
] |
|
return '' |
|
|
|
@app.route('/test_with_categories/') |
|
def test_with_categories(): |
|
messages = flask.get_flashed_messages(with_categories=True) |
|
assert len(messages) == 3 |
|
assert list(messages) == [ |
|
('message', u'Hello World'), |
|
('error', u'Hello World'), |
|
('warning', flask.Markup(u'<em>Testing</em>')) |
|
] |
|
return '' |
|
|
|
@app.route('/test_filter/') |
|
def test_filter(): |
|
messages = flask.get_flashed_messages( |
|
category_filter=['message'], with_categories=True) |
|
assert list(messages) == [('message', u'Hello World')] |
|
return '' |
|
|
|
@app.route('/test_filters/') |
|
def test_filters(): |
|
messages = flask.get_flashed_messages( |
|
category_filter=['message', 'warning'], with_categories=True) |
|
assert list(messages) == [ |
|
('message', u'Hello World'), |
|
('warning', flask.Markup(u'<em>Testing</em>')) |
|
] |
|
return '' |
|
|
|
@app.route('/test_filters_without_returning_categories/') |
|
def test_filters2(): |
|
messages = flask.get_flashed_messages( |
|
category_filter=['message', 'warning']) |
|
assert len(messages) == 2 |
|
assert messages[0] == u'Hello World' |
|
assert messages[1] == flask.Markup(u'<em>Testing</em>') |
|
return '' |
|
|
|
# Create new test client on each test to clean flashed messages. |
|
|
|
client = app.test_client() |
|
client.get('/') |
|
client.get('/test_with_categories/') |
|
|
|
client = app.test_client() |
|
client.get('/') |
|
client.get('/test_filter/') |
|
|
|
client = app.test_client() |
|
client.get('/') |
|
client.get('/test_filters/') |
|
|
|
client = app.test_client() |
|
client.get('/') |
|
client.get('/test_filters_without_returning_categories/') |
|
|
|
|
|
def test_request_processing(app, client): |
|
evts = [] |
|
|
|
@app.before_request |
|
def before_request(): |
|
evts.append('before') |
|
|
|
@app.after_request |
|
def after_request(response): |
|
response.data += b'|after' |
|
evts.append('after') |
|
return response |
|
|
|
@app.route('/') |
|
def index(): |
|
assert 'before' in evts |
|
assert 'after' not in evts |
|
return 'request' |
|
|
|
assert 'after' not in evts |
|
rv = client.get('/').data |
|
assert 'after' in evts |
|
assert rv == b'request|after' |
|
|
|
|
|
def test_request_preprocessing_early_return(app, client): |
|
evts = [] |
|
|
|
@app.before_request |
|
def before_request1(): |
|
evts.append(1) |
|
|
|
@app.before_request |
|
def before_request2(): |
|
evts.append(2) |
|
return "hello" |
|
|
|
@app.before_request |
|
def before_request3(): |
|
evts.append(3) |
|
return "bye" |
|
|
|
@app.route('/') |
|
def index(): |
|
evts.append('index') |
|
return "damnit" |
|
|
|
rv = client.get('/').data.strip() |
|
assert rv == b'hello' |
|
assert evts == [1, 2] |
|
|
|
|
|
def test_after_request_processing(app, client): |
|
@app.route('/') |
|
def index(): |
|
@flask.after_this_request |
|
def foo(response): |
|
response.headers['X-Foo'] = 'a header' |
|
return response |
|
|
|
return 'Test' |
|
|
|
resp = client.get('/') |
|
assert resp.status_code == 200 |
|
assert resp.headers['X-Foo'] == 'a header' |
|
|
|
|
|
def test_teardown_request_handler(app, client): |
|
called = [] |
|
|
|
@app.teardown_request |
|
def teardown_request(exc): |
|
called.append(True) |
|
return "Ignored" |
|
|
|
@app.route('/') |
|
def root(): |
|
return "Response" |
|
|
|
rv = client.get('/') |
|
assert rv.status_code == 200 |
|
assert b'Response' in rv.data |
|
assert len(called) == 1 |
|
|
|
|
|
def test_teardown_request_handler_debug_mode(app, client): |
|
called = [] |
|
|
|
@app.teardown_request |
|
def teardown_request(exc): |
|
called.append(True) |
|
return "Ignored" |
|
|
|
@app.route('/') |
|
def root(): |
|
return "Response" |
|
|
|
rv = client.get('/') |
|
assert rv.status_code == 200 |
|
assert b'Response' in rv.data |
|
assert len(called) == 1 |
|
|
|
|
|
def test_teardown_request_handler_error(app, client): |
|
called = [] |
|
app.testing = False |
|
|
|
@app.teardown_request |
|
def teardown_request1(exc): |
|
assert type(exc) == ZeroDivisionError |
|
called.append(True) |
|
# This raises a new error and blows away sys.exc_info(), so we can |
|
# test that all teardown_requests get passed the same original |
|
# exception. |
|
try: |
|
raise TypeError() |
|
except: |
|
pass |
|
|
|
@app.teardown_request |
|
def teardown_request2(exc): |
|
assert type(exc) == ZeroDivisionError |
|
called.append(True) |
|
# This raises a new error and blows away sys.exc_info(), so we can |
|
# test that all teardown_requests get passed the same original |
|
# exception. |
|
try: |
|
raise TypeError() |
|
except: |
|
pass |
|
|
|
@app.route('/') |
|
def fails(): |
|
1 // 0 |
|
|
|
rv = client.get('/') |
|
assert rv.status_code == 500 |
|
assert b'Internal Server Error' in rv.data |
|
assert len(called) == 2 |
|
|
|
|
|
def test_teardown_app_handler(app, client): |
|
called = [] |
|
|
|
@app.teardown_app |
|
def teardown_app(): |
|
called.append(True) |
|
return "Ignored" |
|
|
|
app.close() |
|
app.close() |
|
assert len(called) == 1 |
|
|
|
|
|
def test_before_after_request_order(app, client): |
|
called = [] |
|
|
|
@app.before_request |
|
def before1(): |
|
called.append(1) |
|
|
|
@app.before_request |
|
def before2(): |
|
called.append(2) |
|
|
|
@app.after_request |
|
def after1(response): |
|
called.append(4) |
|
return response |
|
|
|
@app.after_request |
|
def after2(response): |
|
called.append(3) |
|
return response |
|
|
|
@app.teardown_request |
|
def finish1(exc): |
|
called.append(6) |
|
|
|
@app.teardown_request |
|
def finish2(exc): |
|
called.append(5) |
|
|
|
@app.route('/') |
|
def index(): |
|
return '42' |
|
|
|
rv = client.get('/') |
|
assert rv.data == b'42' |
|
assert called == [1, 2, 3, 4, 5, 6] |
|
|
|
|
|
def test_error_handling(app, client): |
|
app.testing = False |
|
|
|
@app.errorhandler(404) |
|
def not_found(e): |
|
return 'not found', 404 |
|
|
|
@app.errorhandler(500) |
|
def internal_server_error(e): |
|
return 'internal server error', 500 |
|
|
|
@app.errorhandler(Forbidden) |
|
def forbidden(e): |
|
return 'forbidden', 403 |
|
|
|
@app.route('/') |
|
def index(): |
|
flask.abort(404) |
|
|
|
@app.route('/error') |
|
def error(): |
|
1 // 0 |
|
|
|
@app.route('/forbidden') |
|
def error2(): |
|
flask.abort(403) |
|
|
|
rv = client.get('/') |
|
assert rv.status_code == 404 |
|
assert rv.data == b'not found' |
|
rv = client.get('/error') |
|
assert rv.status_code == 500 |
|
assert b'internal server error' == rv.data |
|
rv = client.get('/forbidden') |
|
assert rv.status_code == 403 |
|
assert b'forbidden' == rv.data |
|
|
|
|
|
def test_error_handler_unknown_code(app): |
|
with pytest.raises(KeyError) as exc_info: |
|
app.register_error_handler(999, lambda e: ('999', 999)) |
|
|
|
assert 'Use a subclass' in exc_info.value.args[0] |
|
|
|
|
|
def test_error_handling_processing(app, client): |
|
app.testing = False |
|
|
|
@app.errorhandler(500) |
|
def internal_server_error(e): |
|
return 'internal server error', 500 |
|
|
|
@app.route('/') |
|
def broken_func(): |
|
1 // 0 |
|
|
|
@app.after_request |
|
def after_request(resp): |
|
resp.mimetype = 'text/x-special' |
|
return resp |
|
|
|
resp = client.get('/') |
|
assert resp.mimetype == 'text/x-special' |
|
assert resp.data == b'internal server error' |
|
|
|
|
|
def test_baseexception_error_handling(app, client): |
|
app.testing = False |
|
|
|
@app.route('/') |
|
def broken_func(): |
|
raise KeyboardInterrupt() |
|
|
|
with pytest.raises(KeyboardInterrupt): |
|
client.get('/') |
|
|
|
ctx = flask._request_ctx_stack.top |
|
assert ctx.preserved |
|
assert type(ctx._preserved_exc) is KeyboardInterrupt |
|
|
|
|
|
def test_before_request_and_routing_errors(app, client): |
|
@app.before_request |
|
def attach_something(): |
|
flask.g.something = 'value' |
|
|
|
@app.errorhandler(404) |
|
def return_something(error): |
|
return flask.g.something, 404 |
|
|
|
rv = client.get('/') |
|
assert rv.status_code == 404 |
|
assert rv.data == b'value' |
|
|
|
|
|
def test_user_error_handling(app, client): |
|
class MyException(Exception): |
|
pass |
|
|
|
@app.errorhandler(MyException) |
|
def handle_my_exception(e): |
|
assert isinstance(e, MyException) |
|
return '42' |
|
|
|
@app.route('/') |
|
def index(): |
|
raise MyException() |
|
|
|
assert client.get('/').data == b'42' |
|
|
|
|
|
def test_http_error_subclass_handling(app, client): |
|
class ForbiddenSubclass(Forbidden): |
|
pass |
|
|
|
@app.errorhandler(ForbiddenSubclass) |
|
def handle_forbidden_subclass(e): |
|
assert isinstance(e, ForbiddenSubclass) |
|
return 'banana' |
|
|
|
@app.errorhandler(403) |
|
def handle_forbidden_subclass(e): |
|
assert not isinstance(e, ForbiddenSubclass) |
|
assert isinstance(e, Forbidden) |
|
return 'apple' |
|
|
|
@app.route('/1') |
|
def index1(): |
|
raise ForbiddenSubclass() |
|
|
|
@app.route('/2') |
|
def index2(): |
|
flask.abort(403) |
|
|
|
@app.route('/3') |
|
def index3(): |
|
raise Forbidden() |
|
|
|
assert client.get('/1').data == b'banana' |
|
assert client.get('/2').data == b'apple' |
|
assert client.get('/3').data == b'apple' |
|
|
|
|
|
def test_errorhandler_precedence(app, client): |
|
class E1(Exception): |
|
pass |
|
|
|
class E2(Exception): |
|
pass |
|
|
|
class E3(E1, E2): |
|
pass |
|
|
|
@app.errorhandler(E2) |
|
def handle_e2(e): |
|
return 'E2' |
|
|
|
@app.errorhandler(Exception) |
|
def handle_exception(e): |
|
return 'Exception' |
|
|
|
@app.route('/E1') |
|
def raise_e1(): |
|
raise E1 |
|
|
|
@app.route('/E3') |
|
def raise_e3(): |
|
raise E3 |
|
|
|
rv = client.get('/E1') |
|
assert rv.data == b'Exception' |
|
|
|
rv = client.get('/E3') |
|
assert rv.data == b'E2' |
|
|
|
|
|
def test_trapping_of_bad_request_key_errors(app, client): |
|
@app.route('/fail') |
|
def fail(): |
|
flask.request.form['missing_key'] |
|
|
|
rv = client.get('/fail') |
|
assert rv.status_code == 400 |
|
assert b'missing_key' not in rv.data |
|
|
|
app.config['TRAP_BAD_REQUEST_ERRORS'] = True |
|
|
|
with pytest.raises(KeyError) as e: |
|
client.get("/fail") |
|
|
|
assert e.errisinstance(BadRequest) |
|
assert 'missing_key' in e.value.description |
|
|
|
|
|
def test_trapping_of_all_http_exceptions(app, client): |
|
app.config['TRAP_HTTP_EXCEPTIONS'] = True |
|
|
|
@app.route('/fail') |
|
def fail(): |
|
flask.abort(404) |
|
|
|
with pytest.raises(NotFound): |
|
client.get('/fail') |
|
|
|
|
|
def test_error_handler_after_processor_error(app, client): |
|
app.testing = False |
|
|
|
@app.before_request |
|
def before_request(): |
|
if trigger == 'before': |
|
1 // 0 |
|
|
|
@app.after_request |
|
def after_request(response): |
|
if trigger == 'after': |
|
1 // 0 |
|
return response |
|
|
|
@app.route('/') |
|
def index(): |
|
return 'Foo' |
|
|
|
@app.errorhandler(500) |
|
def internal_server_error(e): |
|
return 'Hello Server Error', 500 |
|
|
|
for trigger in 'before', 'after': |
|
rv = client.get('/') |
|
assert rv.status_code == 500 |
|
assert rv.data == b'Hello Server Error' |
|
|
|
|
|
def test_enctype_debug_helper(app, client): |
|
from flask.debughelpers import DebugFilesKeyError |
|
app.debug = True |
|
|
|
@app.route('/fail', methods=['POST']) |
|
def index(): |
|
return flask.request.files['foo'].filename |
|
|
|
# with statement is important because we leave an exception on the |
|
# stack otherwise and we want to ensure that this is not the case |
|
# to not negatively affect other tests. |
|
with client: |
|
with pytest.raises(DebugFilesKeyError) as e: |
|
client.post('/fail', data={'foo': 'index.txt'}) |
|
assert 'no file contents were transmitted' in str(e.value) |
|
assert 'This was submitted: "index.txt"' in str(e.value) |
|
|
|
|
|
def test_response_types(app, client): |
|
@app.route('/text') |
|
def from_text(): |
|
return u'Hällo Wörld' |
|
|
|
@app.route('/bytes') |
|
def from_bytes(): |
|
return u'Hällo Wörld'.encode('utf-8') |
|
|
|
@app.route('/full_tuple') |
|
def from_full_tuple(): |
|
return 'Meh', 400, { |
|
'X-Foo': 'Testing', |
|
'Content-Type': 'text/plain; charset=utf-8' |
|
} |
|
|
|
@app.route('/text_headers') |
|
def from_text_headers(): |
|
return 'Hello', { |
|
'X-Foo': 'Test', |
|
'Content-Type': 'text/plain; charset=utf-8' |
|
} |
|
|
|
@app.route('/text_status') |
|
def from_text_status(): |
|
return 'Hi, status!', 400 |
|
|
|
@app.route('/response_headers') |
|
def from_response_headers(): |
|
return flask.Response('Hello world', 404, {'X-Foo': 'Baz'}), { |
|
"X-Foo": "Bar", |
|
"X-Bar": "Foo" |
|
} |
|
|
|
@app.route('/response_status') |
|
def from_response_status(): |
|
return app.response_class('Hello world', 400), 500 |
|
|
|
@app.route('/wsgi') |
|
def from_wsgi(): |
|
return NotFound() |
|
|
|
assert client.get('/text').data == u'Hällo Wörld'.encode('utf-8') |
|
assert client.get('/bytes').data == u'Hällo Wörld'.encode('utf-8') |
|
|
|
rv = client.get('/full_tuple') |
|
assert rv.data == b'Meh' |
|
assert rv.headers['X-Foo'] == 'Testing' |
|
assert rv.status_code == 400 |
|
assert rv.mimetype == 'text/plain' |
|
|
|
rv = client.get('/text_headers') |
|
assert rv.data == b'Hello' |
|
assert rv.headers['X-Foo'] == 'Test' |
|
assert rv.status_code == 200 |
|
assert rv.mimetype == 'text/plain' |
|
|
|
rv = client.get('/text_status') |
|
assert rv.data == b'Hi, status!' |
|
assert rv.status_code == 400 |
|
assert rv.mimetype == 'text/html' |
|
|
|
rv = client.get('/response_headers') |
|
assert rv.data == b'Hello world' |
|
assert rv.headers.getlist('X-Foo') == ['Baz', 'Bar'] |
|
assert rv.headers['X-Bar'] == 'Foo' |
|
assert rv.status_code == 404 |
|
|
|
rv = client.get('/response_status') |
|
assert rv.data == b'Hello world' |
|
assert rv.status_code == 500 |
|
|
|
rv = client.get('/wsgi') |
|
assert b'Not Found' in rv.data |
|
assert rv.status_code == 404 |
|
|
|
|
|
def test_response_type_errors(): |
|
app = flask.Flask(__name__) |
|
app.testing = True |
|
|
|
@app.route('/none') |
|
def from_none(): |
|
pass |
|
|
|
@app.route('/small_tuple') |
|
def from_small_tuple(): |
|
return 'Hello', |
|
|
|
@app.route('/large_tuple') |
|
def from_large_tuple(): |
|
return 'Hello', 234, {'X-Foo': 'Bar'}, '???' |
|
|
|
@app.route('/bad_type') |
|
def from_bad_type(): |
|
return True |
|
|
|
@app.route('/bad_wsgi') |
|
def from_bad_wsgi(): |
|
return lambda: None |
|
|
|
c = app.test_client() |
|
|
|
with pytest.raises(TypeError) as e: |
|
c.get('/none') |
|
assert 'returned None' in str(e) |
|
|
|
with pytest.raises(TypeError) as e: |
|
c.get('/small_tuple') |
|
assert 'tuple must have the form' in str(e) |
|
|
|
pytest.raises(TypeError, c.get, '/large_tuple') |
|
|
|
with pytest.raises(TypeError) as e: |
|
c.get('/bad_type') |
|
assert 'it was a bool' in str(e) |
|
|
|
pytest.raises(TypeError, c.get, '/bad_wsgi') |
|
|
|
|
|
def test_make_response(app, req_ctx): |
|
rv = flask.make_response() |
|
assert rv.status_code == 200 |
|
assert rv.data == b'' |
|
assert rv.mimetype == 'text/html' |
|
|
|
rv = flask.make_response('Awesome') |
|
assert rv.status_code == 200 |
|
assert rv.data == b'Awesome' |
|
assert rv.mimetype == 'text/html' |
|
|
|
rv = flask.make_response('W00t', 404) |
|
assert rv.status_code == 404 |
|
assert rv.data == b'W00t' |
|
assert rv.mimetype == 'text/html' |
|
|
|
|
|
def test_make_response_with_response_instance(app, req_ctx): |
|
rv = flask.make_response( |
|
flask.jsonify({'msg': 'W00t'}), 400) |
|
assert rv.status_code == 400 |
|
assert rv.data == b'{"msg":"W00t"}\n' |
|
assert rv.mimetype == 'application/json' |
|
|
|
rv = flask.make_response( |
|
flask.Response(''), 400) |
|
assert rv.status_code == 400 |
|
assert rv.data == b'' |
|
assert rv.mimetype == 'text/html' |
|
|
|
rv = flask.make_response( |
|
flask.Response('', headers={'Content-Type': 'text/html'}), |
|
400, [('X-Foo', 'bar')]) |
|
assert rv.status_code == 400 |
|
assert rv.headers['Content-Type'] == 'text/html' |
|
assert rv.headers['X-Foo'] == 'bar' |
|
|
|
|
|
def test_jsonify_no_prettyprint(app, req_ctx): |
|
app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": False}) |
|
compressed_msg = b'{"msg":{"submsg":"W00t"},"msg2":"foobar"}\n' |
|
uncompressed_msg = { |
|
"msg": { |
|
"submsg": "W00t" |
|
}, |
|
"msg2": "foobar" |
|
} |
|
|
|
rv = flask.make_response( |
|
flask.jsonify(uncompressed_msg), 200) |
|
assert rv.data == compressed_msg |
|
|
|
|
|
def test_jsonify_prettyprint(app, req_ctx): |
|
app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": True}) |
|
compressed_msg = {"msg": {"submsg": "W00t"}, "msg2": "foobar"} |
|
pretty_response = \ |
|
b'{\n "msg": {\n "submsg": "W00t"\n }, \n "msg2": "foobar"\n}\n' |
|
|
|
rv = flask.make_response( |
|
flask.jsonify(compressed_msg), 200) |
|
assert rv.data == pretty_response |
|
|
|
|
|
def test_jsonify_mimetype(app, req_ctx): |
|
app.config.update({"JSONIFY_MIMETYPE": 'application/vnd.api+json'}) |
|
msg = { |
|
"msg": {"submsg": "W00t"}, |
|
} |
|
rv = flask.make_response( |
|
flask.jsonify(msg), 200) |
|
assert rv.mimetype == 'application/vnd.api+json' |
|
|
|
|
|
def test_jsonify_args_and_kwargs_check(app, req_ctx): |
|
with pytest.raises(TypeError) as e: |
|
flask.jsonify('fake args', kwargs='fake') |
|
assert 'behavior undefined' in str(e.value) |
|
|
|
|
|
def test_url_generation(app, req_ctx): |
|
@app.route('/hello/<name>', methods=['POST']) |
|
def hello(): |
|
pass |
|
|
|
assert flask.url_for('hello', name='test x') == '/hello/test%20x' |
|
assert flask.url_for('hello', name='test x', _external=True) == \ |
|
'http://localhost/hello/test%20x' |
|
|
|
|
|
def test_build_error_handler(app): |
|
# Test base case, a URL which results in a BuildError. |
|
with app.test_request_context(): |
|
pytest.raises(BuildError, flask.url_for, 'spam') |
|
|
|
# Verify the error is re-raised if not the current exception. |
|
try: |
|
with app.test_request_context(): |
|
flask.url_for('spam') |
|
except BuildError as err: |
|
error = err |
|
try: |
|
raise RuntimeError('Test case where BuildError is not current.') |
|
except RuntimeError: |
|
pytest.raises( |
|
BuildError, app.handle_url_build_error, error, 'spam', {}) |
|
|
|
# Test a custom handler. |
|
def handler(error, endpoint, values): |
|
# Just a test. |
|
return '/test_handler/' |
|
|
|
app.url_build_error_handlers.append(handler) |
|
with app.test_request_context(): |
|
assert flask.url_for('spam') == '/test_handler/' |
|
|
|
|
|
def test_build_error_handler_reraise(app): |
|
# Test a custom handler which reraises the BuildError |
|
def handler_raises_build_error(error, endpoint, values): |
|
raise error |
|
|
|
app.url_build_error_handlers.append(handler_raises_build_error) |
|
|
|
with app.test_request_context(): |
|
pytest.raises(BuildError, flask.url_for, 'not.existing') |
|
|
|
|
|
def test_url_for_passes_special_values_to_build_error_handler(app): |
|
@app.url_build_error_handlers.append |
|
def handler(error, endpoint, values): |
|
assert values == { |
|
'_external': False, |
|
'_anchor': None, |
|
'_method': None, |
|
'_scheme': None, |
|
} |
|
return 'handled' |
|
|
|
with app.test_request_context(): |
|
flask.url_for('/') |
|
|
|
|
|
def test_custom_converters(app, client): |
|
from werkzeug.routing import BaseConverter |
|
|
|
class ListConverter(BaseConverter): |
|
def to_python(self, value): |
|
return value.split(',') |
|
|
|
def to_url(self, value): |
|
base_to_url = super(ListConverter, self).to_url |
|
return ','.join(base_to_url(x) for x in value) |
|
|
|
app.url_map.converters['list'] = ListConverter |
|
|
|
@app.route('/<list:args>') |
|
def index(args): |
|
return '|'.join(args) |
|
|
|
assert client.get('/1,2,3').data == b'1|2|3' |
|
|
|
|
|
def test_static_files(app, client): |
|
rv = client.get('/static/index.html') |
|
assert rv.status_code == 200 |
|
assert rv.data.strip() == b'<h1>Hello World!</h1>' |
|
with app.test_request_context(): |
|
assert flask.url_for('static', filename='index.html') == \ |
|
'/static/index.html' |
|
rv.close() |
|
|
|
|
|
def test_static_url_path(): |
|
app = flask.Flask(__name__, static_url_path='/foo') |
|
app.testing = True |
|
rv = app.test_client().get('/foo/index.html') |
|
assert rv.status_code == 200 |
|
rv.close() |
|
|
|
with app.test_request_context(): |
|
assert flask.url_for('static', filename='index.html') == '/foo/index.html' |
|
|
|
|
|
def test_static_route_with_host_matching(): |
|
app = flask.Flask(__name__, host_matching=True, static_host='example.com') |
|
c = app.test_client() |
|
rv = c.get('http://example.com/static/index.html') |
|
assert rv.status_code == 200 |
|
rv.close() |
|
with app.test_request_context(): |
|
rv = flask.url_for('static', filename='index.html', _external=True) |
|
assert rv == 'http://example.com/static/index.html' |
|
# Providing static_host without host_matching=True should error. |
|
with pytest.raises(Exception): |
|
flask.Flask(__name__, static_host='example.com') |
|
# Providing host_matching=True with static_folder but without static_host should error. |
|
with pytest.raises(Exception): |
|
flask.Flask(__name__, host_matching=True) |
|
# Providing host_matching=True without static_host but with static_folder=None should not error. |
|
flask.Flask(__name__, host_matching=True, static_folder=None) |
|
|
|
|
|
def test_request_locals(): |
|
assert repr(flask.g) == '<LocalProxy unbound>' |
|
assert not flask.g |
|
|
|
|
|
def test_test_app_proper_environ(app, client): |
|
app.config.update( |
|
SERVER_NAME='localhost.localdomain:5000' |
|
) |
|
|
|
@app.route('/') |
|
def index(): |
|
return 'Foo' |
|
|
|
@app.route('/', subdomain='foo') |
|
def subdomain(): |
|
return 'Foo SubDomain' |
|
|
|
rv = client.get('/') |
|
assert rv.data == b'Foo' |
|
|
|
rv = client.get('/', 'http://localhost.localdomain:5000') |
|
assert rv.data == b'Foo' |
|
|
|
rv = client.get('/', 'https://localhost.localdomain:5000') |
|
assert rv.data == b'Foo' |
|
|
|
app.config.update(SERVER_NAME='localhost.localdomain') |
|
rv = client.get('/', 'https://localhost.localdomain') |
|
assert rv.data == b'Foo' |
|
|
|
try: |
|
app.config.update(SERVER_NAME='localhost.localdomain:443') |
|
rv = client.get('/', 'https://localhost.localdomain') |
|
# Werkzeug 0.8 |
|
assert rv.status_code == 404 |
|
except ValueError as e: |
|
# Werkzeug 0.7 |
|
assert str(e) == ( |
|
"the server name provided " |
|
"('localhost.localdomain:443') does not match the " |
|
"server name from the WSGI environment ('localhost.localdomain')" |
|
) |
|
|
|
try: |
|
app.config.update(SERVER_NAME='localhost.localdomain') |
|
rv = client.get('/', 'http://foo.localhost') |
|
# Werkzeug 0.8 |
|
assert rv.status_code == 404 |
|
except ValueError as e: |
|
# Werkzeug 0.7 |
|
assert str(e) == ( |
|
"the server name provided " |
|
"('localhost.localdomain') does not match the " |
|
"server name from the WSGI environment ('foo.localhost')" |
|
) |
|
|
|
rv = client.get('/', 'http://foo.localhost.localdomain') |
|
assert rv.data == b'Foo SubDomain' |
|
|
|
|
|
def test_exception_propagation(app, client): |
|
def apprunner(config_key): |
|
|
|
@app.route('/') |
|
def index(): |
|
1 // 0 |
|
|
|
if config_key is not None: |
|
app.config[config_key] = True |
|
with pytest.raises(Exception): |
|
client.get('/') |
|
else: |
|
assert client.get('/').status_code == 500 |
|
|
|
# we have to run this test in an isolated thread because if the |
|
# debug flag is set to true and an exception happens the context is |
|
# not torn down. This causes other tests that run after this fail |
|
# when they expect no exception on the stack. |
|
for config_key in 'TESTING', 'PROPAGATE_EXCEPTIONS', 'DEBUG', None: |
|
t = Thread(target=apprunner, args=(config_key,)) |
|
t.start() |
|
t.join() |
|
|
|
|
|
@pytest.mark.parametrize('debug', [True, False]) |
|
@pytest.mark.parametrize('use_debugger', [True, False]) |
|
@pytest.mark.parametrize('use_reloader', [True, False]) |
|
@pytest.mark.parametrize('propagate_exceptions', [None, True, False]) |
|
def test_werkzeug_passthrough_errors(monkeypatch, debug, use_debugger, |
|
use_reloader, propagate_exceptions, app): |
|
rv = {} |
|
|
|
# Mocks werkzeug.serving.run_simple method |
|
def run_simple_mock(*args, **kwargs): |
|
rv['passthrough_errors'] = kwargs.get('passthrough_errors') |
|
|
|
monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) |
|
app.config['PROPAGATE_EXCEPTIONS'] = propagate_exceptions |
|
app.run(debug=debug, use_debugger=use_debugger, use_reloader=use_reloader) |
|
|
|
|
|
def test_max_content_length(app, client): |
|
app.config['MAX_CONTENT_LENGTH'] = 64 |
|
|
|
@app.before_request |
|
def always_first(): |
|
flask.request.form['myfile'] |
|
assert False |
|
|
|
@app.route('/accept', methods=['POST']) |
|
def accept_file(): |
|
flask.request.form['myfile'] |
|
assert False |
|
|
|
@app.errorhandler(413) |
|
def catcher(error): |
|
return '42' |
|
|
|
rv = client.post('/accept', data={'myfile': 'foo' * 100}) |
|
assert rv.data == b'42' |
|
|
|
|
|
def test_url_processors(app, client): |
|
|
|
@app.url_defaults |
|
def add_language_code(endpoint, values): |
|
if flask.g.lang_code is not None and \ |
|
app.url_map.is_endpoint_expecting(endpoint, 'lang_code'): |
|
values.setdefault('lang_code', flask.g.lang_code) |
|
|
|
@app.url_value_preprocessor |
|
def pull_lang_code(endpoint, values): |
|
flask.g.lang_code = values.pop('lang_code', None) |
|
|
|
@app.route('/<lang_code>/') |
|
def index(): |
|
return flask.url_for('about') |
|
|
|
@app.route('/<lang_code>/about') |
|
def about(): |
|
return flask.url_for('something_else') |
|
|
|
@app.route('/foo') |
|
def something_else(): |
|
return flask.url_for('about', lang_code='en') |
|
|
|
assert client.get('/de/').data == b'/de/about' |
|
assert client.get('/de/about').data == b'/foo' |
|
assert client.get('/foo').data == b'/en/about' |
|
|
|
|
|
def test_inject_blueprint_url_defaults(app): |
|
bp = flask.Blueprint('foo.bar.baz', __name__, |
|
template_folder='template') |
|
|
|
@bp.url_defaults |
|
def bp_defaults(endpoint, values): |
|
values['page'] = 'login' |
|
|
|
@bp.route('/<page>') |
|
def view(page): |
|
pass |
|
|
|
app.register_blueprint(bp) |
|
|
|
values = dict() |
|
app.inject_url_defaults('foo.bar.baz.view', values) |
|
expected = dict(page='login') |
|
assert values == expected |
|
|
|
with app.test_request_context('/somepage'): |
|
url = flask.url_for('foo.bar.baz.view') |
|
expected = '/login' |
|
assert url == expected |
|
|
|
|
|
def test_nonascii_pathinfo(app, client): |
|
@app.route(u'/киртест') |
|
def index(): |
|
return 'Hello World!' |
|
|
|
rv = client.get(u'/киртест') |
|
assert rv.data == b'Hello World!' |
|
|
|
|
|
def test_debug_mode_complains_after_first_request(app, client): |
|
app.debug = True |
|
|
|
@app.route('/') |
|
def index(): |
|
return 'Awesome' |
|
|
|
assert not app.got_first_request |
|
assert client.get('/').data == b'Awesome' |
|
with pytest.raises(AssertionError) as e: |
|
@app.route('/foo') |
|
def broken(): |
|
return 'Meh' |
|
assert 'A setup function was called' in str(e) |
|
|
|
app.debug = False |
|
|
|
@app.route('/foo') |
|
def working(): |
|
return 'Meh' |
|
|
|
assert client.get('/foo').data == b'Meh' |
|
assert app.got_first_request |
|
|
|
|
|
def test_before_first_request_functions(app, client): |
|
got = [] |
|
|
|
@app.before_first_request |
|
def foo(): |
|
got.append(42) |
|
|
|
client.get('/') |
|
assert got == [42] |
|
client.get('/') |
|
assert got == [42] |
|
assert app.got_first_request |
|
|
|
|
|
def test_before_first_request_functions_concurrent(app, client): |
|
got = [] |
|
|
|
@app.before_first_request |
|
def foo(): |
|
time.sleep(0.2) |
|
got.append(42) |
|
|
|
def get_and_assert(): |
|
client.get("/") |
|
assert got == [42] |
|
|
|
t = Thread(target=get_and_assert) |
|
t.start() |
|
get_and_assert() |
|
t.join() |
|
assert app.got_first_request |
|
|
|
|
|
def test_routing_redirect_debugging(app, client): |
|
app.debug = True |
|
|
|
@app.route('/foo/', methods=['GET', 'POST']) |
|
def foo(): |
|
return 'success' |
|
|
|
with client: |
|
with pytest.raises(AssertionError) as e: |
|
client.post('/foo', data={}) |
|
assert 'http://localhost/foo/' in str(e) |
|
assert ('Make sure to directly send ' |
|
'your POST-request to this URL') in str(e) |
|
|
|
rv = client.get('/foo', data={}, follow_redirects=True) |
|
assert rv.data == b'success' |
|
|
|
app.debug = False |
|
with client: |
|
rv = client.post('/foo', data={}, follow_redirects=True) |
|
assert rv.data == b'success' |
|
|
|
|
|
def test_route_decorator_custom_endpoint(app, client): |
|
app.debug = True |
|
|
|
@app.route('/foo/') |
|
def foo(): |
|
return flask.request.endpoint |
|
|
|
@app.route('/bar/', endpoint='bar') |
|
def for_bar(): |
|
return flask.request.endpoint |
|
|
|
@app.route('/bar/123', endpoint='123') |
|
def for_bar_foo(): |
|
return flask.request.endpoint |
|
|
|
with app.test_request_context(): |
|
assert flask.url_for('foo') == '/foo/' |
|
assert flask.url_for('bar') == '/bar/' |
|
assert flask.url_for('123') == '/bar/123' |
|
|
|
assert client.get('/foo/').data == b'foo' |
|
assert client.get('/bar/').data == b'bar' |
|
assert client.get('/bar/123').data == b'123' |
|
|
|
|
|
def test_preserve_only_once(app, client): |
|
app.debug = True |
|
|
|
@app.route('/fail') |
|
def fail_func(): |
|
1 // 0 |
|
|
|
for x in range(3): |
|
with pytest.raises(ZeroDivisionError): |
|
client.get('/fail') |
|
|
|
assert flask._request_ctx_stack.top is not None |
|
assert flask._app_ctx_stack.top is not None |
|
# implicit appctx disappears too |
|
flask._request_ctx_stack.top.pop() |
|
assert flask._request_ctx_stack.top is None |
|
assert flask._app_ctx_stack.top is None |
|
|
|
|
|
def test_preserve_remembers_exception(app, client): |
|
app.debug = True |
|
errors = [] |
|
|
|
@app.route('/fail') |
|
def fail_func(): |
|
1 // 0 |
|
|
|
@app.route('/success') |
|
def success_func(): |
|
return 'Okay' |
|
|
|
@app.teardown_request |
|
def teardown_handler(exc): |
|
errors.append(exc) |
|
|
|
# After this failure we did not yet call the teardown handler |
|
with pytest.raises(ZeroDivisionError): |
|
client.get('/fail') |
|
assert errors == [] |
|
|
|
# But this request triggers it, and it's an error |
|
client.get('/success') |
|
assert len(errors) == 2 |
|
assert isinstance(errors[0], ZeroDivisionError) |
|
|
|
# At this point another request does nothing. |
|
client.get('/success') |
|
assert len(errors) == 3 |
|
assert errors[1] is None |
|
|
|
|
|
def test_get_method_on_g(app_ctx): |
|
assert flask.g.get('x') is None |
|
assert flask.g.get('x', 11) == 11 |
|
flask.g.x = 42 |
|
assert flask.g.get('x') == 42 |
|
assert flask.g.x == 42 |
|
|
|
|
|
def test_g_iteration_protocol(app_ctx): |
|
flask.g.foo = 23 |
|
flask.g.bar = 42 |
|
assert 'foo' in flask.g |
|
assert 'foos' not in flask.g |
|
assert sorted(flask.g) == ['bar', 'foo'] |
|
|
|
|
|
def test_subdomain_basic_support(app, client): |
|
app.config['SERVER_NAME'] = 'localhost.localdomain' |
|
|
|
@app.route('/') |
|
def normal_index(): |
|
return 'normal index' |
|
|
|
@app.route('/', subdomain='test') |
|
def test_index(): |
|
return 'test index' |
|
|
|
rv = client.get('/', 'http://localhost.localdomain/') |
|
assert rv.data == b'normal index' |
|
|
|
rv = client.get('/', 'http://test.localhost.localdomain/') |
|
assert rv.data == b'test index' |
|
|
|
|
|
def test_subdomain_matching(app, client): |
|
app.config['SERVER_NAME'] = 'localhost.localdomain' |
|
|
|
@app.route('/', subdomain='<user>') |
|
def index(user): |
|
return 'index for %s' % user |
|
|
|
rv = client.get('/', 'http://mitsuhiko.localhost.localdomain/') |
|
assert rv.data == b'index for mitsuhiko' |
|
|
|
|
|
def test_subdomain_matching_with_ports(app, client): |
|
app.config['SERVER_NAME'] = 'localhost.localdomain:3000' |
|
|
|
@app.route('/', subdomain='<user>') |
|
def index(user): |
|
return 'index for %s' % user |
|
|
|
rv = client.get('/', 'http://mitsuhiko.localhost.localdomain:3000/') |
|
assert rv.data == b'index for mitsuhiko' |
|
|
|
|
|
def test_multi_route_rules(app, client): |
|
@app.route('/') |
|
@app.route('/<test>/') |
|
def index(test='a'): |
|
return test |
|
|
|
rv = client.open('/') |
|
assert rv.data == b'a' |
|
rv = client.open('/b/') |
|
assert rv.data == b'b' |
|
|
|
|
|
def test_multi_route_class_views(app, client): |
|
class View(object): |
|
def __init__(self, app): |
|
app.add_url_rule('/', 'index', self.index) |
|
app.add_url_rule('/<test>/', 'index', self.index) |
|
|
|
def index(self, test='a'): |
|
return test |
|
|
|
_ = View(app) |
|
rv = client.open('/') |
|
assert rv.data == b'a' |
|
rv = client.open('/b/') |
|
assert rv.data == b'b' |
|
|
|
|
|
def test_run_defaults(monkeypatch, app): |
|
rv = {} |
|
|
|
# Mocks werkzeug.serving.run_simple method |
|
def run_simple_mock(*args, **kwargs): |
|
rv['result'] = 'running...' |
|
|
|
monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) |
|
app.run() |
|
assert rv['result'] == 'running...' |
|
|
|
|
|
def test_run_server_port(monkeypatch, app): |
|
rv = {} |
|
|
|
# Mocks werkzeug.serving.run_simple method |
|
def run_simple_mock(hostname, port, application, *args, **kwargs): |
|
rv['result'] = 'running on %s:%s ...' % (hostname, port) |
|
|
|
monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) |
|
hostname, port = 'localhost', 8000 |
|
app.run(hostname, port, debug=True) |
|
assert rv['result'] == 'running on %s:%s ...' % (hostname, port) |
|
|
|
|
|
@pytest.mark.parametrize('host,port,expect_host,expect_port', ( |
|
(None, None, 'pocoo.org', 8080), |
|
('localhost', None, 'localhost', 8080), |
|
(None, 80, 'pocoo.org', 80), |
|
('localhost', 80, 'localhost', 80), |
|
)) |
|
def test_run_from_config(monkeypatch, host, port, expect_host, expect_port, app): |
|
def run_simple_mock(hostname, port, *args, **kwargs): |
|
assert hostname == expect_host |
|
assert port == expect_port |
|
|
|
monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) |
|
app.config['SERVER_NAME'] = 'pocoo.org:8080' |
|
app.run(host, port)
|
|
|