Browse Source

Updated examples to new sqlite patterns and added new section to appcontext docs

pull/609/head
Armin Ronacher 12 years ago
parent
commit
c2e5799879
  1. 44
      docs/appcontext.rst
  2. 107
      docs/patterns/sqlite3.rst
  3. 41
      examples/flaskr/flaskr.py
  4. 81
      examples/minitwit/minitwit.py

44
docs/appcontext.rst

@ -85,3 +85,47 @@ Extensions are free to store additional information on the topmost level,
assuming they pick a sufficiently unique name. assuming they pick a sufficiently unique name.
For more information about that, see :ref:`extension-dev`. For more information about that, see :ref:`extension-dev`.
Context Usage
-------------
The context is typically used to cache resources on there that need to be
created on a per-request or usage case. For instance database connects
are destined to go there. When storing things on the application context
unique names should be chosen as this is a place that is shared between
Flask applications and extensions.
The most common usage is to split resource management into two parts:
1. an implicit resource caching on the context.
2. a context teardown based resource deallocation.
Generally there would be a ``get_X()`` function that creates resource
``X`` if it does not exist yet and otherwise returns the same resource,
and a ``teardown_X()`` function that is registered as teardown handler.
This is an example that connects to a database::
import sqlite3
from flask import _app_ctx_stack
def get_db():
top = _app_ctx_stack.top
if not hasattr(top, 'database'):
top.database = connect_to_database()
return top.database
@app.teardown_appcontext
def teardown_db(exception):
top = _app_ctx_stack.top
if hasattr(top, 'database'):
top.database.close()
The first time ``get_db()`` is called the connection will be established.
To make this implicit a :class:`~werkzeug.local.LocalProxy` can be used::
from werkzeug.local import LocalProxy
db = LocalProxy(get_db)
That way a user can directly access ``db`` which internally calls
``get_db()``.

107
docs/patterns/sqlite3.rst

@ -3,61 +3,61 @@
Using SQLite 3 with Flask Using SQLite 3 with Flask
========================= =========================
In Flask you can implement the opening of database connections at the In Flask you can implement the opening of database connections on demand
beginning of the request and closing at the end with the and closing it when the context dies (usually at the end of the request)
:meth:`~flask.Flask.before_request` and :meth:`~flask.Flask.teardown_request` easily.
decorators in combination with the special :class:`~flask.g` object.
So here is a simple example of how you can use SQLite 3 with Flask:: Here is a simple example of how you can use SQLite 3 with Flask::
import sqlite3 import sqlite3
from flask import g from flask import _app_ctx_stack
DATABASE = '/path/to/database.db' DATABASE = '/path/to/database.db'
def connect_db(): def get_db():
return sqlite3.connect(DATABASE) top = _app_ctx_stack.top
if not hasattr(top, 'sqlite_db'):
top.sqlite_db = sqlite3.connect(DATABASE)
return top.sqlite_db
@app.before_request @app.teardown_appcontext
def before_request(): def close_connection(exception):
g.db = connect_db() top = _app_ctx_stack.top
if hasattr(top, 'sqlite_db'):
top.sqlite_db.close()
@app.teardown_request All the application needs to do in order to now use the database is having
def teardown_request(exception): an active application context (which is always true if there is an request
if hasattr(g, 'db'): in flight) or to create an application context itself. At that point the
g.db.close() ``get_db`` function can be used to get the current database connection.
Whenever the context is destroyed the database connection will be
terminated.
.. note:: Example::
Please keep in mind that the teardown request functions are always @app.route('/')
executed, even if a before-request handler failed or was never def index():
executed. Because of this we have to make sure here that the database cur = get_db().cursor()
is there before we close it. ...
Connect on Demand
-----------------
The downside of this approach is that this will only work if Flask .. note::
executed the before-request handlers for you. If you are attempting to
use the database from a script or the interactive Python shell you would
have to do something like this::
with app.test_request_context(): Please keep in mind that the teardown request and appcontext functions
app.preprocess_request() are always executed, even if a before-request handler failed or was
# now you can use the g.db object never executed. Because of this we have to make sure here that the
database is there before we close it.
In order to trigger the execution of the connection code. You won't be Connect on Demand
able to drop the dependency on the request context this way, but you could -----------------
make it so that the application connects when necessary::
def get_connection(): The upside of this approach (connecting on first use) is that this will
db = getattr(g, '_db', None) only opening the connection if truly necessary. If you want to use this
if db is None: code outside a request context you can use it in a Python shell by opening
db = g._db = connect_db() the application context by hand::
return db
Downside here is that you have to use ``db = get_connection()`` instead of with app.app_context():
just being able to use ``g.db`` directly. # now you can use get_db()
.. _easy-querying: .. _easy-querying:
@ -66,16 +66,28 @@ Easy Querying
Now in each request handling function you can access `g.db` to get the Now in each request handling function you can access `g.db` to get the
current open database connection. To simplify working with SQLite, a current open database connection. To simplify working with SQLite, a
helper function can be useful:: row factory function is useful. It is executed for every result returned
from the database to convert the result. For instance in order to get
dictionaries instead of tuples this can be used::
def make_dicts(cursor, row):
return dict((cur.description[idx][0], value)
for idx, value in enumerate(row))
db.row_factory = make_dicts
Additionally it is a good idea to provide a query function that combines
getting the cursor, executing and fetching the results::
def query_db(query, args=(), one=False): def query_db(query, args=(), one=False):
cur = g.db.execute(query, args) cur = get_db().execute(query, args)
rv = [dict((cur.description[idx][0], value) rv = cur.fetchall()
for idx, value in enumerate(row)) for row in cur.fetchall()] cur.close()
return (rv[0] if rv else None) if one else rv return (rv[0] if rv else None) if one else rv
This handy little function makes working with the database much more This handy little function in combination with a row factory makes working
pleasant than it is by just using the raw cursor and connection objects. with the database much more pleasant than it is by just using the raw
cursor and connection objects.
Here is how you can use it:: Here is how you can use it::
@ -105,10 +117,9 @@ Relational databases need schemas, so applications often ship a
a function that creates the database based on that schema. This function a function that creates the database based on that schema. This function
can do that for you:: can do that for you::
from contextlib import closing
def init_db(): def init_db():
with closing(connect_db()) as db: with app.app_context():
db = get_db()
with app.open_resource('schema.sql') as f: with app.open_resource('schema.sql') as f:
db.cursor().executescript(f.read()) db.cursor().executescript(f.read())
db.commit() db.commit()

41
examples/flaskr/flaskr.py

@ -11,9 +11,8 @@
""" """
from __future__ import with_statement from __future__ import with_statement
from sqlite3 import dbapi2 as sqlite3 from sqlite3 import dbapi2 as sqlite3
from contextlib import closing
from flask import Flask, request, session, g, redirect, url_for, abort, \ from flask import Flask, request, session, g, redirect, url_for, abort, \
render_template, flash render_template, flash, _app_ctx_stack
# configuration # configuration
DATABASE = '/tmp/flaskr.db' DATABASE = '/tmp/flaskr.db'
@ -28,35 +27,37 @@ app.config.from_object(__name__)
app.config.from_envvar('FLASKR_SETTINGS', silent=True) app.config.from_envvar('FLASKR_SETTINGS', silent=True)
def connect_db():
"""Returns a new connection to the database."""
return sqlite3.connect(app.config['DATABASE'])
def init_db(): def init_db():
"""Creates the database tables.""" """Creates the database tables."""
with closing(connect_db()) as db: with app.app_context():
db = get_db()
with app.open_resource('schema.sql') as f: with app.open_resource('schema.sql') as f:
db.cursor().executescript(f.read()) db.cursor().executescript(f.read())
db.commit() db.commit()
@app.before_request def get_db():
def before_request(): """Opens a new database connection if there is none yet for the
"""Make sure we are connected to the database each request.""" current application context.
g.db = connect_db() """
top = _app_ctx_stack.top
if not hasattr(top, 'sqlite_db'):
top.sqlite_db = sqlite3.connect(app.config['DATABASE'])
return top.sqlite_db
@app.teardown_request @app.teardown_appcontext
def teardown_request(exception): def close_db_connection(exception):
"""Closes the database again at the end of the request.""" """Closes the database again at the end of the request."""
if hasattr(g, 'db'): top = _app_ctx_stack.top
g.db.close() if hasattr(top, 'sqlite_db'):
top.sqlite_db.close()
@app.route('/') @app.route('/')
def show_entries(): def show_entries():
cur = g.db.execute('select title, text from entries order by id desc') db = get_db()
cur = db.execute('select title, text from entries order by id desc')
entries = [dict(title=row[0], text=row[1]) for row in cur.fetchall()] entries = [dict(title=row[0], text=row[1]) for row in cur.fetchall()]
return render_template('show_entries.html', entries=entries) return render_template('show_entries.html', entries=entries)
@ -65,9 +66,10 @@ def show_entries():
def add_entry(): def add_entry():
if not session.get('logged_in'): if not session.get('logged_in'):
abort(401) abort(401)
g.db.execute('insert into entries (title, text) values (?, ?)', db = get_db()
db.execute('insert into entries (title, text) values (?, ?)',
[request.form['title'], request.form['text']]) [request.form['title'], request.form['text']])
g.db.commit() db.commit()
flash('New entry was successfully posted') flash('New entry was successfully posted')
return redirect(url_for('show_entries')) return redirect(url_for('show_entries'))
@ -95,4 +97,5 @@ def logout():
if __name__ == '__main__': if __name__ == '__main__':
init_db()
app.run() app.run()

81
examples/minitwit/minitwit.py

@ -13,9 +13,8 @@ import time
from sqlite3 import dbapi2 as sqlite3 from sqlite3 import dbapi2 as sqlite3
from hashlib import md5 from hashlib import md5
from datetime import datetime from datetime import datetime
from contextlib import closing
from flask import Flask, request, session, url_for, redirect, \ from flask import Flask, request, session, url_for, redirect, \
render_template, abort, g, flash render_template, abort, g, flash, _app_ctx_stack
from werkzeug import check_password_hash, generate_password_hash from werkzeug import check_password_hash, generate_password_hash
@ -31,14 +30,29 @@ app.config.from_object(__name__)
app.config.from_envvar('MINITWIT_SETTINGS', silent=True) app.config.from_envvar('MINITWIT_SETTINGS', silent=True)
def connect_db(): def get_db():
"""Returns a new connection to the database.""" """Opens a new database connection if there is none yet for the
return sqlite3.connect(app.config['DATABASE']) current application context.
"""
top = _app_ctx_stack.top
if not hasattr(top, 'sqlite_db'):
top.sqlite_db = sqlite3.connect(app.config['DATABASE'])
top.sqlite_db.row_factory = sqlite3.Row
return top.sqlite_db
@app.teardown_appcontext
def close_database(exception):
"""Closes the database again at the end of the request."""
top = _app_ctx_stack.top
if hasattr(top, 'sqlite_db'):
top.sqlite_db.close()
def init_db(): def init_db():
"""Creates the database tables.""" """Creates the database tables."""
with closing(connect_db()) as db: with app.app_context():
db = get_db()
with app.open_resource('schema.sql') as f: with app.open_resource('schema.sql') as f:
db.cursor().executescript(f.read()) db.cursor().executescript(f.read())
db.commit() db.commit()
@ -46,16 +60,15 @@ def init_db():
def query_db(query, args=(), one=False): def query_db(query, args=(), one=False):
"""Queries the database and returns a list of dictionaries.""" """Queries the database and returns a list of dictionaries."""
cur = g.db.execute(query, args) cur = get_db().execute(query, args)
rv = [dict((cur.description[idx][0], value) rv = cur.fetchall()
for idx, value in enumerate(row)) for row in cur.fetchall()]
return (rv[0] if rv else None) if one else rv return (rv[0] if rv else None) if one else rv
def get_user_id(username): def get_user_id(username):
"""Convenience method to look up the id for a username.""" """Convenience method to look up the id for a username."""
rv = g.db.execute('select user_id from user where username = ?', rv = query_db('select user_id from user where username = ?',
[username]).fetchone() [username], one=True)
return rv[0] if rv else None return rv[0] if rv else None
@ -72,23 +85,12 @@ def gravatar_url(email, size=80):
@app.before_request @app.before_request
def before_request(): def before_request():
"""Make sure we are connected to the database each request and look
up the current user so that we know he's there.
"""
g.db = connect_db()
g.user = None g.user = None
if 'user_id' in session: if 'user_id' in session:
g.user = query_db('select * from user where user_id = ?', g.user = query_db('select * from user where user_id = ?',
[session['user_id']], one=True) [session['user_id']], one=True)
@app.teardown_request
def teardown_request(exception):
"""Closes the database again at the end of the request."""
if hasattr(g, 'db'):
g.db.close()
@app.route('/') @app.route('/')
def timeline(): def timeline():
"""Shows a users timeline or if no user is logged in it will """Shows a users timeline or if no user is logged in it will
@ -145,9 +147,10 @@ def follow_user(username):
whom_id = get_user_id(username) whom_id = get_user_id(username)
if whom_id is None: if whom_id is None:
abort(404) abort(404)
g.db.execute('insert into follower (who_id, whom_id) values (?, ?)', db = get_db()
[session['user_id'], whom_id]) db.execute('insert into follower (who_id, whom_id) values (?, ?)',
g.db.commit() [session['user_id'], whom_id])
db.commit()
flash('You are now following "%s"' % username) flash('You are now following "%s"' % username)
return redirect(url_for('user_timeline', username=username)) return redirect(url_for('user_timeline', username=username))
@ -160,9 +163,10 @@ def unfollow_user(username):
whom_id = get_user_id(username) whom_id = get_user_id(username)
if whom_id is None: if whom_id is None:
abort(404) abort(404)
g.db.execute('delete from follower where who_id=? and whom_id=?', db = get_db()
[session['user_id'], whom_id]) db.execute('delete from follower where who_id=? and whom_id=?',
g.db.commit() [session['user_id'], whom_id])
db.commit()
flash('You are no longer following "%s"' % username) flash('You are no longer following "%s"' % username)
return redirect(url_for('user_timeline', username=username)) return redirect(url_for('user_timeline', username=username))
@ -173,10 +177,11 @@ def add_message():
if 'user_id' not in session: if 'user_id' not in session:
abort(401) abort(401)
if request.form['text']: if request.form['text']:
g.db.execute('''insert into message (author_id, text, pub_date) db = get_db()
values (?, ?, ?)''', (session['user_id'], request.form['text'], db.execute('''insert into message (author_id, text, pub_date)
int(time.time()))) values (?, ?, ?)''', (session['user_id'], request.form['text'],
g.db.commit() int(time.time())))
db.commit()
flash('Your message was recorded') flash('Your message was recorded')
return redirect(url_for('timeline')) return redirect(url_for('timeline'))
@ -221,11 +226,12 @@ def register():
elif get_user_id(request.form['username']) is not None: elif get_user_id(request.form['username']) is not None:
error = 'The username is already taken' error = 'The username is already taken'
else: else:
g.db.execute('''insert into user ( db = get_db()
username, email, pw_hash) values (?, ?, ?)''', db.execute('''insert into user (
[request.form['username'], request.form['email'], username, email, pw_hash) values (?, ?, ?)''',
generate_password_hash(request.form['password'])]) [request.form['username'], request.form['email'],
g.db.commit() generate_password_hash(request.form['password'])])
db.commit()
flash('You were successfully registered and can login now') flash('You were successfully registered and can login now')
return redirect(url_for('login')) return redirect(url_for('login'))
return render_template('register.html', error=error) return render_template('register.html', error=error)
@ -245,4 +251,5 @@ app.jinja_env.filters['gravatar'] = gravatar_url
if __name__ == '__main__': if __name__ == '__main__':
init_db()
app.run() app.run()

Loading…
Cancel
Save