# -*- coding: utf-8 -*- """ tests.basic ~~~~~~~~~~~~~~~~~~~~~ The basic functionality. :copyright: (c) 2015 by Armin Ronacher. :license: BSD, see LICENSE for more details. """ import pytest import re import uuid import time import flask import pickle from datetime import datetime from threading import Thread from flask._compat import text_type from werkzeug.exceptions import BadRequest, NotFound, Forbidden from werkzeug.http import parse_date from werkzeug.routing import BuildError import werkzeug.serving def test_options_work(app, client): @app.route('/', methods=['GET', 'POST']) def index(): return 'Hello World' rv = client.open('/', method='OPTIONS') assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] assert rv.data == b'' def test_options_on_multiple_rules(app, client): @app.route('/', methods=['GET', 'POST']) def index(): return 'Hello World' @app.route('/', methods=['PUT']) def index_put(): return 'Aha!' rv = client.open('/', method='OPTIONS') assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST', 'PUT'] def test_provide_automatic_options_attr(): app = flask.Flask(__name__) def index(): return 'Hello World!' index.provide_automatic_options = False app.route('/')(index) rv = app.test_client().open('/', method='OPTIONS') assert rv.status_code == 405 app = flask.Flask(__name__) def index2(): return 'Hello World!' index2.provide_automatic_options = True app.route('/', methods=['OPTIONS'])(index2) rv = app.test_client().open('/', method='OPTIONS') assert sorted(rv.allow) == ['OPTIONS'] def test_provide_automatic_options_kwarg(app, client): def index(): return flask.request.method def more(): return flask.request.method app.add_url_rule('/', view_func=index, provide_automatic_options=False) app.add_url_rule( '/more', view_func=more, methods=['GET', 'POST'], provide_automatic_options=False ) assert client.get('/').data == b'GET' rv = client.post('/') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD'] # Older versions of Werkzeug.test.Client don't have an options method if hasattr(client, 'options'): rv = client.options('/') else: rv = client.open('/', method='OPTIONS') assert rv.status_code == 405 rv = client.head('/') assert rv.status_code == 200 assert not rv.data # head truncates assert client.post('/more').data == b'POST' assert client.get('/more').data == b'GET' rv = client.delete('/more') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'POST'] if hasattr(client, 'options'): rv = client.options('/more') else: rv = client.open('/more', method='OPTIONS') assert rv.status_code == 405 def test_request_dispatching(app, client): @app.route('/') def index(): return flask.request.method @app.route('/more', methods=['GET', 'POST']) def more(): return flask.request.method assert client.get('/').data == b'GET' rv = client.post('/') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] rv = client.head('/') assert rv.status_code == 200 assert not rv.data # head truncates assert client.post('/more').data == b'POST' assert client.get('/more').data == b'GET' rv = client.delete('/more') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] def test_disallow_string_for_allowed_methods(app): with pytest.raises(TypeError): @app.route('/', methods='GET POST') def index(): return "Hey" def test_url_mapping(app, client): random_uuid4 = "7eb41166-9ebf-4d26-b771-ea3f54f8b383" def index(): return flask.request.method def more(): return flask.request.method def options(): return random_uuid4 app.add_url_rule('/', 'index', index) app.add_url_rule('/more', 'more', more, methods=['GET', 'POST']) # Issue 1288: Test that automatic options are not added when non-uppercase 'options' in methods app.add_url_rule('/options', 'options', options, methods=['options']) assert client.get('/').data == b'GET' rv = client.post('/') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] rv = client.head('/') assert rv.status_code == 200 assert not rv.data # head truncates assert client.post('/more').data == b'POST' assert client.get('/more').data == b'GET' rv = client.delete('/more') assert rv.status_code == 405 assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] rv = client.open('/options', method='OPTIONS') assert rv.status_code == 200 assert random_uuid4 in rv.data.decode("utf-8") def test_werkzeug_routing(app, client): from werkzeug.routing import Submount, Rule app.url_map.add(Submount('/foo', [ Rule('/bar', endpoint='bar'), Rule('/', endpoint='index') ])) def bar(): return 'bar' def index(): return 'index' app.view_functions['bar'] = bar app.view_functions['index'] = index assert client.get('/foo/').data == b'index' assert client.get('/foo/bar').data == b'bar' def test_endpoint_decorator(app, client): from werkzeug.routing import Submount, Rule app.url_map.add(Submount('/foo', [ Rule('/bar', endpoint='bar'), Rule('/', endpoint='index') ])) @app.endpoint('bar') def bar(): return 'bar' @app.endpoint('index') def index(): return 'index' assert client.get('/foo/').data == b'index' assert client.get('/foo/bar').data == b'bar' def test_session(app, client): app.secret_key = 'testkey' @app.route('/set', methods=['POST']) def set(): flask.session['value'] = flask.request.form['value'] return 'value set' @app.route('/get') def get(): return flask.session['value'] assert client.post('/set', data={'value': '42'}).data == b'value set' assert client.get('/get').data == b'42' def test_session_using_server_name(app, client): app.config.update( SECRET_KEY='foo', SERVER_NAME='example.com' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com/') assert 'domain=.example.com' in rv.headers['set-cookie'].lower() assert 'httponly' in rv.headers['set-cookie'].lower() def test_session_using_server_name_and_port(app, client): app.config.update( SECRET_KEY='foo', SERVER_NAME='example.com:8080' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com:8080/') assert 'domain=.example.com' in rv.headers['set-cookie'].lower() assert 'httponly' in rv.headers['set-cookie'].lower() def test_session_using_server_name_port_and_path(app, client): app.config.update( SECRET_KEY='foo', SERVER_NAME='example.com:8080', APPLICATION_ROOT='/foo' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com:8080/foo') assert 'domain=example.com' in rv.headers['set-cookie'].lower() assert 'path=/foo' in rv.headers['set-cookie'].lower() assert 'httponly' in rv.headers['set-cookie'].lower() def test_session_using_application_root(app, client): class PrefixPathMiddleware(object): def __init__(self, app, prefix): self.app = app self.prefix = prefix def __call__(self, environ, start_response): environ['SCRIPT_NAME'] = self.prefix return self.app(environ, start_response) app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar') app.config.update( SECRET_KEY='foo', APPLICATION_ROOT='/bar' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://example.com:8080/') assert 'path=/bar' in rv.headers['set-cookie'].lower() def test_session_using_session_settings(app, client): app.config.update( SECRET_KEY='foo', SERVER_NAME='www.example.com:8080', APPLICATION_ROOT='/test', SESSION_COOKIE_DOMAIN='.example.com', SESSION_COOKIE_HTTPONLY=False, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_PATH='/' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = client.get('/', 'http://www.example.com:8080/test/') cookie = rv.headers['set-cookie'].lower() assert 'domain=.example.com' in cookie assert 'path=/' in cookie assert 'secure' in cookie assert 'httponly' not in cookie def test_session_localhost_warning(recwarn, app, client): app.config.update( SECRET_KEY='testing', SERVER_NAME='localhost:5000', ) @app.route('/') def index(): flask.session['testing'] = 42 return 'testing' rv = client.get('/', 'http://localhost:5000/') assert 'domain' not in rv.headers['set-cookie'].lower() w = recwarn.pop(UserWarning) assert '"localhost" is not a valid cookie domain' in str(w.message) def test_session_ip_warning(recwarn, app, client): app.config.update( SECRET_KEY='testing', SERVER_NAME='127.0.0.1:5000', ) @app.route('/') def index(): flask.session['testing'] = 42 return 'testing' rv = client.get('/', 'http://127.0.0.1:5000/') assert 'domain=127.0.0.1' in rv.headers['set-cookie'].lower() w = recwarn.pop(UserWarning) assert 'cookie domain is an IP' in str(w.message) def test_missing_session(): app = flask.Flask(__name__) def expect_exception(f, *args, **kwargs): e = pytest.raises(RuntimeError, f, *args, **kwargs) assert e.value.args and 'session is unavailable' in e.value.args[0] with app.test_request_context(): assert flask.session.get('missing_key') is None expect_exception(flask.session.__setitem__, 'foo', 42) expect_exception(flask.session.pop, 'foo') def test_session_expiration(app, client): permanent = True app.secret_key = 'testkey' @app.route('/') def index(): flask.session['test'] = 42 flask.session.permanent = permanent return '' @app.route('/test') def test(): return text_type(flask.session.permanent) rv = client.get('/') assert 'set-cookie' in rv.headers match = re.search(r'(?i)\bexpires=([^;]+)', rv.headers['set-cookie']) expires = parse_date(match.group()) expected = datetime.utcnow() + app.permanent_session_lifetime assert expires.year == expected.year assert expires.month == expected.month assert expires.day == expected.day rv = client.get('/test') assert rv.data == b'True' permanent = False rv = client.get('/') assert 'set-cookie' in rv.headers match = re.search(r'\bexpires=([^;]+)', rv.headers['set-cookie']) assert match is None def test_session_stored_last(app, client): app.secret_key = 'development-key' @app.after_request def modify_session(response): flask.session['foo'] = 42 return response @app.route('/') def dump_session_contents(): return repr(flask.session.get('foo')) assert client.get('/').data == b'None' assert client.get('/').data == b'42' def test_session_special_types(app, client): app.secret_key = 'development-key' now = datetime.utcnow().replace(microsecond=0) the_uuid = uuid.uuid4() @app.after_request def modify_session(response): flask.session['m'] = flask.Markup('Hello!') flask.session['u'] = the_uuid flask.session['dt'] = now flask.session['b'] = b'\xff' flask.session['t'] = (1, 2, 3) return response @app.route('/') def dump_session_contents(): return pickle.dumps(dict(flask.session)) client.get('/') rv = pickle.loads(client.get('/').data) assert rv['m'] == flask.Markup('Hello!') assert type(rv['m']) == flask.Markup assert rv['dt'] == now assert rv['u'] == the_uuid assert rv['b'] == b'\xff' assert type(rv['b']) == bytes assert rv['t'] == (1, 2, 3) def test_session_cookie_setting(app): app.secret_key = 'dev key' is_permanent = True @app.route('/bump') def bump(): rv = flask.session['foo'] = flask.session.get('foo', 0) + 1 flask.session.permanent = is_permanent return str(rv) @app.route('/read') def read(): return str(flask.session.get('foo', 0)) def run_test(expect_header): with app.test_client() as c: assert c.get('/bump').data == b'1' assert c.get('/bump').data == b'2' assert c.get('/bump').data == b'3' rv = c.get('/read') set_cookie = rv.headers.get('set-cookie') assert (set_cookie is not None) == expect_header assert rv.data == b'3' is_permanent = True app.config['SESSION_REFRESH_EACH_REQUEST'] = True run_test(expect_header=True) is_permanent = True app.config['SESSION_REFRESH_EACH_REQUEST'] = False run_test(expect_header=False) is_permanent = False app.config['SESSION_REFRESH_EACH_REQUEST'] = True run_test(expect_header=False) is_permanent = False app.config['SESSION_REFRESH_EACH_REQUEST'] = False run_test(expect_header=False) def test_session_vary_cookie(app, client): app.secret_key = 'testkey' @app.route('/set') def set_session(): flask.session['test'] = 'test' return '' @app.route('/get') def get(): return flask.session.get('test') @app.route('/getitem') def getitem(): return flask.session['test'] @app.route('/setdefault') def setdefault(): return flask.session.setdefault('test', 'default') @app.route('/no-vary-header') def no_vary_header(): return '' def expect(path, header=True): rv = client.get(path) if header: assert rv.headers['Vary'] == 'Cookie' else: assert 'Vary' not in rv.headers expect('/set') expect('/get') expect('/getitem') expect('/setdefault') expect('/no-vary-header', False) def test_flashes(app, req_ctx): app.secret_key = 'testkey' assert not flask.session.modified flask.flash('Zap') flask.session.modified = False flask.flash('Zip') assert flask.session.modified assert list(flask.get_flashed_messages()) == ['Zap', 'Zip'] def test_extended_flashing(app): # Be sure app.testing=True below, else tests can fail silently. # # Specifically, if app.testing is not set to True, the AssertionErrors # in the view functions will cause a 500 response to the test client # instead of propagating exceptions. app.secret_key = 'testkey' @app.route('/') def index(): flask.flash(u'Hello World') flask.flash(u'Hello World', 'error') flask.flash(flask.Markup(u'Testing'), 'warning') return '' @app.route('/test/') def test(): messages = flask.get_flashed_messages() assert list(messages) == [ u'Hello World', u'Hello World', flask.Markup(u'Testing') ] return '' @app.route('/test_with_categories/') def test_with_categories(): messages = flask.get_flashed_messages(with_categories=True) assert len(messages) == 3 assert list(messages) == [ ('message', u'Hello World'), ('error', u'Hello World'), ('warning', flask.Markup(u'Testing')) ] return '' @app.route('/test_filter/') def test_filter(): messages = flask.get_flashed_messages( category_filter=['message'], with_categories=True) assert list(messages) == [('message', u'Hello World')] return '' @app.route('/test_filters/') def test_filters(): messages = flask.get_flashed_messages( category_filter=['message', 'warning'], with_categories=True) assert list(messages) == [ ('message', u'Hello World'), ('warning', flask.Markup(u'Testing')) ] return '' @app.route('/test_filters_without_returning_categories/') def test_filters2(): messages = flask.get_flashed_messages( category_filter=['message', 'warning']) assert len(messages) == 2 assert messages[0] == u'Hello World' assert messages[1] == flask.Markup(u'Testing') return '' # Create new test client on each test to clean flashed messages. client = app.test_client() client.get('/') client.get('/test_with_categories/') client = app.test_client() client.get('/') client.get('/test_filter/') client = app.test_client() client.get('/') client.get('/test_filters/') client = app.test_client() client.get('/') client.get('/test_filters_without_returning_categories/') def test_request_processing(app, client): evts = [] @app.before_request def before_request(): evts.append('before') @app.after_request def after_request(response): response.data += b'|after' evts.append('after') return response @app.route('/') def index(): assert 'before' in evts assert 'after' not in evts return 'request' assert 'after' not in evts rv = client.get('/').data assert 'after' in evts assert rv == b'request|after' def test_request_preprocessing_early_return(app, client): evts = [] @app.before_request def before_request1(): evts.append(1) @app.before_request def before_request2(): evts.append(2) return "hello" @app.before_request def before_request3(): evts.append(3) return "bye" @app.route('/') def index(): evts.append('index') return "damnit" rv = client.get('/').data.strip() assert rv == b'hello' assert evts == [1, 2] def test_after_request_processing(app, client): @app.route('/') def index(): @flask.after_this_request def foo(response): response.headers['X-Foo'] = 'a header' return response return 'Test' resp = client.get('/') assert resp.status_code == 200 assert resp.headers['X-Foo'] == 'a header' def test_teardown_request_handler(app, client): called = [] @app.teardown_request def teardown_request(exc): called.append(True) return "Ignored" @app.route('/') def root(): return "Response" rv = client.get('/') assert rv.status_code == 200 assert b'Response' in rv.data assert len(called) == 1 def test_teardown_request_handler_debug_mode(app, client): called = [] @app.teardown_request def teardown_request(exc): called.append(True) return "Ignored" @app.route('/') def root(): return "Response" rv = client.get('/') assert rv.status_code == 200 assert b'Response' in rv.data assert len(called) == 1 def test_teardown_request_handler_error(app, client): called = [] app.config['LOGGER_HANDLER_POLICY'] = 'never' app.testing = False @app.teardown_request def teardown_request1(exc): assert type(exc) == ZeroDivisionError called.append(True) # This raises a new error and blows away sys.exc_info(), so we can # test that all teardown_requests get passed the same original # exception. try: raise TypeError() except: pass @app.teardown_request def teardown_request2(exc): assert type(exc) == ZeroDivisionError called.append(True) # This raises a new error and blows away sys.exc_info(), so we can # test that all teardown_requests get passed the same original # exception. try: raise TypeError() except: pass @app.route('/') def fails(): 1 // 0 rv = client.get('/') assert rv.status_code == 500 assert b'Internal Server Error' in rv.data assert len(called) == 2 def test_before_after_request_order(app, client): called = [] @app.before_request def before1(): called.append(1) @app.before_request def before2(): called.append(2) @app.after_request def after1(response): called.append(4) return response @app.after_request def after2(response): called.append(3) return response @app.teardown_request def finish1(exc): called.append(6) @app.teardown_request def finish2(exc): called.append(5) @app.route('/') def index(): return '42' rv = client.get('/') assert rv.data == b'42' assert called == [1, 2, 3, 4, 5, 6] def test_error_handling(app, client): app.config['LOGGER_HANDLER_POLICY'] = 'never' app.testing = False @app.errorhandler(404) def not_found(e): return 'not found', 404 @app.errorhandler(500) def internal_server_error(e): return 'internal server error', 500 @app.errorhandler(Forbidden) def forbidden(e): return 'forbidden', 403 @app.route('/') def index(): flask.abort(404) @app.route('/error') def error(): 1 // 0 @app.route('/forbidden') def error2(): flask.abort(403) rv = client.get('/') assert rv.status_code == 404 assert rv.data == b'not found' rv = client.get('/error') assert rv.status_code == 500 assert b'internal server error' == rv.data rv = client.get('/forbidden') assert rv.status_code == 403 assert b'forbidden' == rv.data def test_error_handling_processing(app, client): app.config['LOGGER_HANDLER_POLICY'] = 'never' app.testing = False @app.errorhandler(500) def internal_server_error(e): return 'internal server error', 500 @app.route('/') def broken_func(): 1 // 0 @app.after_request def after_request(resp): resp.mimetype = 'text/x-special' return resp resp = client.get('/') assert resp.mimetype == 'text/x-special' assert resp.data == b'internal server error' def test_baseexception_error_handling(app, client): app.config['LOGGER_HANDLER_POLICY'] = 'never' app.testing = False @app.route('/') def broken_func(): raise KeyboardInterrupt() with pytest.raises(KeyboardInterrupt): client.get('/') ctx = flask._request_ctx_stack.top assert ctx.preserved assert type(ctx._preserved_exc) is KeyboardInterrupt def test_before_request_and_routing_errors(app, client): @app.before_request def attach_something(): flask.g.something = 'value' @app.errorhandler(404) def return_something(error): return flask.g.something, 404 rv = client.get('/') assert rv.status_code == 404 assert rv.data == b'value' def test_user_error_handling(app, client): class MyException(Exception): pass @app.errorhandler(MyException) def handle_my_exception(e): assert isinstance(e, MyException) return '42' @app.route('/') def index(): raise MyException() assert client.get('/').data == b'42' def test_http_error_subclass_handling(app, client): class ForbiddenSubclass(Forbidden): pass @app.errorhandler(ForbiddenSubclass) def handle_forbidden_subclass(e): assert isinstance(e, ForbiddenSubclass) return 'banana' @app.errorhandler(403) def handle_forbidden_subclass(e): assert not isinstance(e, ForbiddenSubclass) assert isinstance(e, Forbidden) return 'apple' @app.route('/1') def index1(): raise ForbiddenSubclass() @app.route('/2') def index2(): flask.abort(403) @app.route('/3') def index3(): raise Forbidden() assert client.get('/1').data == b'banana' assert client.get('/2').data == b'apple' assert client.get('/3').data == b'apple' def test_trapping_of_bad_request_key_errors(app, client): @app.route('/fail') def fail(): flask.request.form['missing_key'] assert client.get('/fail').status_code == 400 app.config['TRAP_BAD_REQUEST_ERRORS'] = True with pytest.raises(KeyError) as e: client.get("/fail") assert e.errisinstance(BadRequest) def test_trapping_of_all_http_exceptions(app, client): app.config['TRAP_HTTP_EXCEPTIONS'] = True @app.route('/fail') def fail(): flask.abort(404) with pytest.raises(NotFound): client.get('/fail') def test_enctype_debug_helper(app, client): from flask.debughelpers import DebugFilesKeyError app.debug = True @app.route('/fail', methods=['POST']) def index(): return flask.request.files['foo'].filename # with statement is important because we leave an exception on the # stack otherwise and we want to ensure that this is not the case # to not negatively affect other tests. with client: with pytest.raises(DebugFilesKeyError) as e: client.post('/fail', data={'foo': 'index.txt'}) assert 'no file contents were transmitted' in str(e.value) assert 'This was submitted: "index.txt"' in str(e.value) def test_response_types(app, client): @app.route('/text') def from_text(): return u'Hällo Wörld' @app.route('/bytes') def from_bytes(): return u'Hällo Wörld'.encode('utf-8') @app.route('/full_tuple') def from_full_tuple(): return 'Meh', 400, { 'X-Foo': 'Testing', 'Content-Type': 'text/plain; charset=utf-8' } @app.route('/text_headers') def from_text_headers(): return 'Hello', { 'X-Foo': 'Test', 'Content-Type': 'text/plain; charset=utf-8' } @app.route('/text_status') def from_text_status(): return 'Hi, status!', 400 @app.route('/response_headers') def from_response_headers(): return flask.Response('Hello world', 404, {'X-Foo': 'Baz'}), { "X-Foo": "Bar", "X-Bar": "Foo" } @app.route('/response_status') def from_response_status(): return app.response_class('Hello world', 400), 500 @app.route('/wsgi') def from_wsgi(): return NotFound() assert client.get('/text').data == u'Hällo Wörld'.encode('utf-8') assert client.get('/bytes').data == u'Hällo Wörld'.encode('utf-8') rv = client.get('/full_tuple') assert rv.data == b'Meh' assert rv.headers['X-Foo'] == 'Testing' assert rv.status_code == 400 assert rv.mimetype == 'text/plain' rv = client.get('/text_headers') assert rv.data == b'Hello' assert rv.headers['X-Foo'] == 'Test' assert rv.status_code == 200 assert rv.mimetype == 'text/plain' rv = client.get('/text_status') assert rv.data == b'Hi, status!' assert rv.status_code == 400 assert rv.mimetype == 'text/html' rv = client.get('/response_headers') assert rv.data == b'Hello world' assert rv.headers.getlist('X-Foo') == ['Baz', 'Bar'] assert rv.headers['X-Bar'] == 'Foo' assert rv.status_code == 404 rv = client.get('/response_status') assert rv.data == b'Hello world' assert rv.status_code == 500 rv = client.get('/wsgi') assert b'Not Found' in rv.data assert rv.status_code == 404 def test_response_type_errors(): app = flask.Flask(__name__) app.testing = True @app.route('/none') def from_none(): pass @app.route('/small_tuple') def from_small_tuple(): return 'Hello', @app.route('/large_tuple') def from_large_tuple(): return 'Hello', 234, {'X-Foo': 'Bar'}, '???' @app.route('/bad_type') def from_bad_type(): return True @app.route('/bad_wsgi') def from_bad_wsgi(): return lambda: None c = app.test_client() with pytest.raises(TypeError) as e: c.get('/none') assert 'returned None' in str(e) with pytest.raises(TypeError) as e: c.get('/small_tuple') assert 'tuple must have the form' in str(e) pytest.raises(TypeError, c.get, '/large_tuple') with pytest.raises(TypeError) as e: c.get('/bad_type') assert 'it was a bool' in str(e) pytest.raises(TypeError, c.get, '/bad_wsgi') def test_make_response(app, req_ctx): rv = flask.make_response() assert rv.status_code == 200 assert rv.data == b'' assert rv.mimetype == 'text/html' rv = flask.make_response('Awesome') assert rv.status_code == 200 assert rv.data == b'Awesome' assert rv.mimetype == 'text/html' rv = flask.make_response('W00t', 404) assert rv.status_code == 404 assert rv.data == b'W00t' assert rv.mimetype == 'text/html' def test_make_response_with_response_instance(app, req_ctx): rv = flask.make_response( flask.jsonify({'msg': 'W00t'}), 400) assert rv.status_code == 400 assert rv.data == b'{"msg":"W00t"}\n' assert rv.mimetype == 'application/json' rv = flask.make_response( flask.Response(''), 400) assert rv.status_code == 400 assert rv.data == b'' assert rv.mimetype == 'text/html' rv = flask.make_response( flask.Response('', headers={'Content-Type': 'text/html'}), 400, [('X-Foo', 'bar')]) assert rv.status_code == 400 assert rv.headers['Content-Type'] == 'text/html' assert rv.headers['X-Foo'] == 'bar' def test_jsonify_no_prettyprint(app, req_ctx): app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": False}) compressed_msg = b'{"msg":{"submsg":"W00t"},"msg2":"foobar"}\n' uncompressed_msg = { "msg": { "submsg": "W00t" }, "msg2": "foobar" } rv = flask.make_response( flask.jsonify(uncompressed_msg), 200) assert rv.data == compressed_msg def test_jsonify_prettyprint(app, req_ctx): app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": True}) compressed_msg = {"msg": {"submsg": "W00t"}, "msg2": "foobar"} pretty_response = \ b'{\n "msg": {\n "submsg": "W00t"\n }, \n "msg2": "foobar"\n}\n' rv = flask.make_response( flask.jsonify(compressed_msg), 200) assert rv.data == pretty_response def test_jsonify_mimetype(app, req_ctx): app.config.update({"JSONIFY_MIMETYPE": 'application/vnd.api+json'}) msg = { "msg": {"submsg": "W00t"}, } rv = flask.make_response( flask.jsonify(msg), 200) assert rv.mimetype == 'application/vnd.api+json' def test_jsonify_args_and_kwargs_check(app, req_ctx): with pytest.raises(TypeError) as e: flask.jsonify('fake args', kwargs='fake') assert 'behavior undefined' in str(e.value) def test_url_generation(app, req_ctx): @app.route('/hello/', methods=['POST']) def hello(): pass assert flask.url_for('hello', name='test x') == '/hello/test%20x' assert flask.url_for('hello', name='test x', _external=True) == \ 'http://localhost/hello/test%20x' def test_build_error_handler(app): # Test base case, a URL which results in a BuildError. with app.test_request_context(): pytest.raises(BuildError, flask.url_for, 'spam') # Verify the error is re-raised if not the current exception. try: with app.test_request_context(): flask.url_for('spam') except BuildError as err: error = err try: raise RuntimeError('Test case where BuildError is not current.') except RuntimeError: pytest.raises( BuildError, app.handle_url_build_error, error, 'spam', {}) # Test a custom handler. def handler(error, endpoint, values): # Just a test. return '/test_handler/' app.url_build_error_handlers.append(handler) with app.test_request_context(): assert flask.url_for('spam') == '/test_handler/' def test_build_error_handler_reraise(app): # Test a custom handler which reraises the BuildError def handler_raises_build_error(error, endpoint, values): raise error app.url_build_error_handlers.append(handler_raises_build_error) with app.test_request_context(): pytest.raises(BuildError, flask.url_for, 'not.existing') def test_url_for_passes_special_values_to_build_error_handler(app): @app.url_build_error_handlers.append def handler(error, endpoint, values): assert values == { '_external': False, '_anchor': None, '_method': None, '_scheme': None, } return 'handled' with app.test_request_context(): flask.url_for('/') def test_custom_converters(app, client): from werkzeug.routing import BaseConverter class ListConverter(BaseConverter): def to_python(self, value): return value.split(',') def to_url(self, value): base_to_url = super(ListConverter, self).to_url return ','.join(base_to_url(x) for x in value) app.url_map.converters['list'] = ListConverter @app.route('/') def index(args): return '|'.join(args) assert client.get('/1,2,3').data == b'1|2|3' def test_static_files(app, client): rv = client.get('/static/index.html') assert rv.status_code == 200 assert rv.data.strip() == b'

Hello World!

' with app.test_request_context(): assert flask.url_for('static', filename='index.html') == \ '/static/index.html' rv.close() def test_static_path_deprecated(recwarn): app = flask.Flask(__name__, static_path='/foo') recwarn.pop(DeprecationWarning) app.testing = True rv = app.test_client().get('/foo/index.html') assert rv.status_code == 200 rv.close() with app.test_request_context(): assert flask.url_for('static', filename='index.html') == '/foo/index.html' def test_static_url_path(): app = flask.Flask(__name__, static_url_path='/foo') app.testing = True rv = app.test_client().get('/foo/index.html') assert rv.status_code == 200 rv.close() with app.test_request_context(): assert flask.url_for('static', filename='index.html') == '/foo/index.html' def test_static_route_with_host_matching(): app = flask.Flask(__name__, host_matching=True, static_host='example.com') c = app.test_client() rv = c.get('http://example.com/static/index.html') assert rv.status_code == 200 rv.close() with app.test_request_context(): rv = flask.url_for('static', filename='index.html', _external=True) assert rv == 'http://example.com/static/index.html' # Providing static_host without host_matching=True should error. with pytest.raises(Exception): flask.Flask(__name__, static_host='example.com') # Providing host_matching=True with static_folder but without static_host should error. with pytest.raises(Exception): flask.Flask(__name__, host_matching=True) # Providing host_matching=True without static_host but with static_folder=None should not error. flask.Flask(__name__, host_matching=True, static_folder=None) def test_request_locals(): assert repr(flask.g) == '' assert not flask.g def test_test_app_proper_environ(app, client): app.config.update( SERVER_NAME='localhost.localdomain:5000' ) @app.route('/') def index(): return 'Foo' @app.route('/', subdomain='foo') def subdomain(): return 'Foo SubDomain' rv = client.get('/') assert rv.data == b'Foo' rv = client.get('/', 'http://localhost.localdomain:5000') assert rv.data == b'Foo' rv = client.get('/', 'https://localhost.localdomain:5000') assert rv.data == b'Foo' app.config.update(SERVER_NAME='localhost.localdomain') rv = client.get('/', 'https://localhost.localdomain') assert rv.data == b'Foo' try: app.config.update(SERVER_NAME='localhost.localdomain:443') rv = client.get('/', 'https://localhost.localdomain') # Werkzeug 0.8 assert rv.status_code == 404 except ValueError as e: # Werkzeug 0.7 assert str(e) == ( "the server name provided " "('localhost.localdomain:443') does not match the " "server name from the WSGI environment ('localhost.localdomain')" ) try: app.config.update(SERVER_NAME='localhost.localdomain') rv = client.get('/', 'http://foo.localhost') # Werkzeug 0.8 assert rv.status_code == 404 except ValueError as e: # Werkzeug 0.7 assert str(e) == ( "the server name provided " "('localhost.localdomain') does not match the " "server name from the WSGI environment ('foo.localhost')" ) rv = client.get('/', 'http://foo.localhost.localdomain') assert rv.data == b'Foo SubDomain' def test_exception_propagation(app, client): def apprunner(config_key): app.config['LOGGER_HANDLER_POLICY'] = 'never' @app.route('/') def index(): 1 // 0 if config_key is not None: app.config[config_key] = True with pytest.raises(Exception): client.get('/') else: assert client.get('/').status_code == 500 # we have to run this test in an isolated thread because if the # debug flag is set to true and an exception happens the context is # not torn down. This causes other tests that run after this fail # when they expect no exception on the stack. for config_key in 'TESTING', 'PROPAGATE_EXCEPTIONS', 'DEBUG', None: t = Thread(target=apprunner, args=(config_key,)) t.start() t.join() @pytest.mark.parametrize('debug', [True, False]) @pytest.mark.parametrize('use_debugger', [True, False]) @pytest.mark.parametrize('use_reloader', [True, False]) @pytest.mark.parametrize('propagate_exceptions', [None, True, False]) def test_werkzeug_passthrough_errors(monkeypatch, debug, use_debugger, use_reloader, propagate_exceptions, app): rv = {} # Mocks werkzeug.serving.run_simple method def run_simple_mock(*args, **kwargs): rv['passthrough_errors'] = kwargs.get('passthrough_errors') monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) app.config['PROPAGATE_EXCEPTIONS'] = propagate_exceptions app.run(debug=debug, use_debugger=use_debugger, use_reloader=use_reloader) def test_max_content_length(app, client): app.config['MAX_CONTENT_LENGTH'] = 64 @app.before_request def always_first(): flask.request.form['myfile'] assert False @app.route('/accept', methods=['POST']) def accept_file(): flask.request.form['myfile'] assert False @app.errorhandler(413) def catcher(error): return '42' rv = client.post('/accept', data={'myfile': 'foo' * 100}) assert rv.data == b'42' def test_url_processors(app, client): @app.url_defaults def add_language_code(endpoint, values): if flask.g.lang_code is not None and \ app.url_map.is_endpoint_expecting(endpoint, 'lang_code'): values.setdefault('lang_code', flask.g.lang_code) @app.url_value_preprocessor def pull_lang_code(endpoint, values): flask.g.lang_code = values.pop('lang_code', None) @app.route('//') def index(): return flask.url_for('about') @app.route('//about') def about(): return flask.url_for('something_else') @app.route('/foo') def something_else(): return flask.url_for('about', lang_code='en') assert client.get('/de/').data == b'/de/about' assert client.get('/de/about').data == b'/foo' assert client.get('/foo').data == b'/en/about' def test_inject_blueprint_url_defaults(app): bp = flask.Blueprint('foo.bar.baz', __name__, template_folder='template') @bp.url_defaults def bp_defaults(endpoint, values): values['page'] = 'login' @bp.route('/') def view(page): pass app.register_blueprint(bp) values = dict() app.inject_url_defaults('foo.bar.baz.view', values) expected = dict(page='login') assert values == expected with app.test_request_context('/somepage'): url = flask.url_for('foo.bar.baz.view') expected = '/login' assert url == expected def test_nonascii_pathinfo(app, client): @app.route(u'/киртест') def index(): return 'Hello World!' rv = client.get(u'/киртест') assert rv.data == b'Hello World!' def test_debug_mode_complains_after_first_request(app, client): app.debug = True @app.route('/') def index(): return 'Awesome' assert not app.got_first_request assert client.get('/').data == b'Awesome' with pytest.raises(AssertionError) as e: @app.route('/foo') def broken(): return 'Meh' assert 'A setup function was called' in str(e) app.debug = False @app.route('/foo') def working(): return 'Meh' assert client.get('/foo').data == b'Meh' assert app.got_first_request def test_before_first_request_functions(app, client): got = [] @app.before_first_request def foo(): got.append(42) client.get('/') assert got == [42] client.get('/') assert got == [42] assert app.got_first_request def test_before_first_request_functions_concurrent(app, client): got = [] @app.before_first_request def foo(): time.sleep(0.2) got.append(42) def get_and_assert(): client.get("/") assert got == [42] t = Thread(target=get_and_assert) t.start() get_and_assert() t.join() assert app.got_first_request def test_routing_redirect_debugging(app, client): app.debug = True @app.route('/foo/', methods=['GET', 'POST']) def foo(): return 'success' with client: with pytest.raises(AssertionError) as e: client.post('/foo', data={}) assert 'http://localhost/foo/' in str(e) assert ('Make sure to directly send ' 'your POST-request to this URL') in str(e) rv = client.get('/foo', data={}, follow_redirects=True) assert rv.data == b'success' app.debug = False with client: rv = client.post('/foo', data={}, follow_redirects=True) assert rv.data == b'success' def test_route_decorator_custom_endpoint(app, client): app.debug = True @app.route('/foo/') def foo(): return flask.request.endpoint @app.route('/bar/', endpoint='bar') def for_bar(): return flask.request.endpoint @app.route('/bar/123', endpoint='123') def for_bar_foo(): return flask.request.endpoint with app.test_request_context(): assert flask.url_for('foo') == '/foo/' assert flask.url_for('bar') == '/bar/' assert flask.url_for('123') == '/bar/123' assert client.get('/foo/').data == b'foo' assert client.get('/bar/').data == b'bar' assert client.get('/bar/123').data == b'123' def test_preserve_only_once(app, client): app.debug = True @app.route('/fail') def fail_func(): 1 // 0 for x in range(3): with pytest.raises(ZeroDivisionError): client.get('/fail') assert flask._request_ctx_stack.top is not None assert flask._app_ctx_stack.top is not None # implicit appctx disappears too flask._request_ctx_stack.top.pop() assert flask._request_ctx_stack.top is None assert flask._app_ctx_stack.top is None def test_preserve_remembers_exception(app, client): app.debug = True errors = [] @app.route('/fail') def fail_func(): 1 // 0 @app.route('/success') def success_func(): return 'Okay' @app.teardown_request def teardown_handler(exc): errors.append(exc) # After this failure we did not yet call the teardown handler with pytest.raises(ZeroDivisionError): client.get('/fail') assert errors == [] # But this request triggers it, and it's an error client.get('/success') assert len(errors) == 2 assert isinstance(errors[0], ZeroDivisionError) # At this point another request does nothing. client.get('/success') assert len(errors) == 3 assert errors[1] is None def test_get_method_on_g(app_ctx): assert flask.g.get('x') is None assert flask.g.get('x', 11) == 11 flask.g.x = 42 assert flask.g.get('x') == 42 assert flask.g.x == 42 def test_g_iteration_protocol(app_ctx): flask.g.foo = 23 flask.g.bar = 42 assert 'foo' in flask.g assert 'foos' not in flask.g assert sorted(flask.g) == ['bar', 'foo'] def test_subdomain_basic_support(app, client): app.config['SERVER_NAME'] = 'localhost.localdomain' @app.route('/') def normal_index(): return 'normal index' @app.route('/', subdomain='test') def test_index(): return 'test index' rv = client.get('/', 'http://localhost.localdomain/') assert rv.data == b'normal index' rv = client.get('/', 'http://test.localhost.localdomain/') assert rv.data == b'test index' def test_subdomain_matching(app, client): app.config['SERVER_NAME'] = 'localhost.localdomain' @app.route('/', subdomain='') def index(user): return 'index for %s' % user rv = client.get('/', 'http://mitsuhiko.localhost.localdomain/') assert rv.data == b'index for mitsuhiko' def test_subdomain_matching_with_ports(app, client): app.config['SERVER_NAME'] = 'localhost.localdomain:3000' @app.route('/', subdomain='') def index(user): return 'index for %s' % user rv = client.get('/', 'http://mitsuhiko.localhost.localdomain:3000/') assert rv.data == b'index for mitsuhiko' def test_multi_route_rules(app, client): @app.route('/') @app.route('//') def index(test='a'): return test rv = client.open('/') assert rv.data == b'a' rv = client.open('/b/') assert rv.data == b'b' def test_multi_route_class_views(app, client): class View(object): def __init__(self, app): app.add_url_rule('/', 'index', self.index) app.add_url_rule('//', 'index', self.index) def index(self, test='a'): return test _ = View(app) rv = client.open('/') assert rv.data == b'a' rv = client.open('/b/') assert rv.data == b'b' def test_run_defaults(monkeypatch, app): rv = {} # Mocks werkzeug.serving.run_simple method def run_simple_mock(*args, **kwargs): rv['result'] = 'running...' monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) app.run() assert rv['result'] == 'running...' def test_run_server_port(monkeypatch, app): rv = {} # Mocks werkzeug.serving.run_simple method def run_simple_mock(hostname, port, application, *args, **kwargs): rv['result'] = 'running on %s:%s ...' % (hostname, port) monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) hostname, port = 'localhost', 8000 app.run(hostname, port, debug=True) assert rv['result'] == 'running on %s:%s ...' % (hostname, port) @pytest.mark.parametrize('host,port,expect_host,expect_port', ( (None, None, 'pocoo.org', 8080), ('localhost', None, 'localhost', 8080), (None, 80, 'pocoo.org', 80), ('localhost', 80, 'localhost', 80), )) def test_run_from_config(monkeypatch, host, port, expect_host, expect_port, app): def run_simple_mock(hostname, port, *args, **kwargs): assert hostname == expect_host assert port == expect_port monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) app.config['SERVER_NAME'] = 'pocoo.org:8080' app.run(host, port)