mirror of https://github.com/mitsuhiko/flask.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
228 lines
7.6 KiB
228 lines
7.6 KiB
# -*- coding: utf-8 -*- |
|
from __future__ import with_statement |
|
import re |
|
import time |
|
import sqlite3 |
|
from hashlib import md5 |
|
from datetime import datetime |
|
from contextlib import closing |
|
from flask import Flask, request, session, url_for, redirect, \ |
|
render_template, abort, g, flash |
|
from werkzeug import check_password_hash, generate_password_hash |
|
|
|
|
|
# configuration |
|
DATABASE = '/tmp/minitwit.db' |
|
PER_PAGE = 30 |
|
DEBUG = True |
|
SECRET_KEY = 'development key' |
|
|
|
# create our little application :) |
|
app = Flask(__name__) |
|
|
|
|
|
def connect_db(): |
|
"""Returns a new database connection to the database.""" |
|
return sqlite3.connect(DATABASE) |
|
|
|
|
|
def init_db(): |
|
"""Creates the database tables.""" |
|
with closing(connect_db()) as db: |
|
with app.open_resource('schema.sql') as f: |
|
db.cursor().executescript(f.read()) |
|
db.commit() |
|
|
|
|
|
def query_db(query, args=(), one=False): |
|
"""Queries the database and returns a list of dictionaries.""" |
|
cur = g.db.execute(query, args) |
|
rv = [dict((cur.description[idx][0], value) |
|
for idx, value in enumerate(row)) for row in cur.fetchall()] |
|
return (rv[0] if rv else None) if one else rv |
|
|
|
|
|
def get_user_id(username): |
|
"""Convenience method to look up the id for a username""" |
|
rv = g.db.execute('select user_id from user where username = ?', |
|
[username]).fetchone() |
|
return rv[0] if rv else None |
|
|
|
|
|
def format_datetime(timestamp): |
|
"""Format a timestamp for display""" |
|
return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d @ %H:%M') |
|
|
|
|
|
def gravatar_url(email, size=80): |
|
"""Return the gravatar image for the given email address""" |
|
return 'http://www.gravatar.com/avatar/%s?d=identicon&s=%d' % \ |
|
(md5(email.strip().lower().encode('utf-8')).hexdigest(), size) |
|
|
|
|
|
@app.request_init |
|
def before_request(): |
|
"""Make sure we are connected to the database each request and look |
|
up the current user so that we know he's there. |
|
""" |
|
g.db = connect_db() |
|
if 'user_id' in session: |
|
g.user = query_db('select * from user where user_id = ?', |
|
[session['user_id']], one=True) |
|
|
|
|
|
@app.request_shutdown |
|
def after_request(request): |
|
"""Closes the database again at the end of the request.""" |
|
g.db.close() |
|
return request |
|
|
|
|
|
@app.route('/') |
|
def timeline(): |
|
if not 'user_id' in session: |
|
return redirect(url_for('public_timeline')) |
|
offset = request.args.get('offset', type=int) |
|
return render_template('timeline.html', messages=query_db(''' |
|
select message.*, user.* from message, user |
|
where message.author_id = user.user_id and ( |
|
user.user_id = ? or |
|
user.user_id in (select whom_id from follower |
|
where who_id = ?)) |
|
order by message.pub_date desc limit ?''', |
|
[session['user_id'], session['user_id'], PER_PAGE])) |
|
|
|
|
|
@app.route('/public') |
|
def public_timeline(): |
|
return render_template('timeline.html', messages=query_db(''' |
|
select message.*, user.* from message, user |
|
where message.author_id = user.user_id |
|
order by message.pub_date desc limit ?''', [PER_PAGE])) |
|
|
|
|
|
@app.route('/<username>') |
|
def user_timeline(username): |
|
profile_user = query_db('select * from user where username = ?', |
|
[username], one=True) |
|
if profile_user is None: |
|
abort(404) |
|
followd = False |
|
if 'user_id' in session: |
|
followed = query_db('''select 1 from follower where |
|
follower.who_id = ? and follower.whom_id = ?''', |
|
[session['user_id'], profile_user['user_id']], one=True) is not None |
|
return render_template('timeline.html', messages=query_db(''' |
|
select message.*, user.* from message, user where |
|
user.user_id = message.author_id and user.user_id = ? |
|
order by message.pub_date desc limit ?''', |
|
[profile_user['user_id'], PER_PAGE]), followed=followed, |
|
profile_user=profile_user) |
|
|
|
|
|
@app.route('/<username>/follow') |
|
def follow_user(username): |
|
if not 'user_id' in session: |
|
abort(401) |
|
whom_id = get_user_id(username) |
|
if whom_id is None: |
|
abort(404) |
|
g.db.execute('insert into follower (who_id, whom_id) values (?, ?)', |
|
[session['user_id'], whom_id]) |
|
g.db.commit() |
|
flash('You are now following "%s"' % username) |
|
return redirect(url_for('user_timeline', username=username)) |
|
|
|
|
|
@app.route('/<username>/unfollow') |
|
def unfollow_user(username): |
|
if not 'user_id' in session: |
|
abort(401) |
|
whom_id = get_user_id(username) |
|
if whom_id is None: |
|
abort(404) |
|
g.db.execute('delete from follower where who_id=? and whom_id=?', |
|
[session['user_id'], whom_id]) |
|
g.db.commit() |
|
flash('You are no longer following "%s"' % username) |
|
return redirect(url_for('user_timeline', username=username)) |
|
|
|
|
|
@app.route('/add_message', methods=['POST']) |
|
def add_message(): |
|
if 'user_id' not in session: |
|
abort(401) |
|
if request.form['text']: |
|
g.db.execute('''insert into message (author_id, text, pub_date) |
|
values (?, ?, ?)''', (session['user_id'], request.form['text'], |
|
int(time.time()))) |
|
g.db.commit() |
|
flash('Your message was recorded') |
|
return redirect(url_for('timeline')) |
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if 'user_id' in session: |
|
return redirect(url_for('timeline')) |
|
error = None |
|
if request.method == 'POST': |
|
user = query_db('''select * from user where |
|
username = ?''', [request.form['username']], one=True) |
|
if user is None: |
|
error = 'Invalid username' |
|
elif not check_password_hash(user['pw_hash'], |
|
request.form['password']): |
|
error = 'Invalid password' |
|
else: |
|
flash('You were logged in') |
|
session['user_id'] = user['user_id'] |
|
return redirect(url_for('timeline')) |
|
return render_template('login.html', error=error) |
|
|
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if 'user_id' in session: |
|
return redirect(url_for('timeline')) |
|
error = None |
|
if request.method == 'POST': |
|
if not request.form['username']: |
|
error = 'You have to enter a username' |
|
elif not request.form['email'] or \ |
|
'@' not in request.form['email']: |
|
error = 'You have to enter a valid email address' |
|
elif not request.form['password']: |
|
error = 'You have to enter a password' |
|
elif request.form['password'] != request.form['password2']: |
|
error = 'The two passwords to not match' |
|
elif get_user_id(request.form['username']) is not None: |
|
error = 'The username is already taken' |
|
else: |
|
g.db.execute('''insert into user ( |
|
username, email, pw_hash) values (?, ?, ?)''', |
|
[request.form['username'], request.form['email'], |
|
generate_password_hash(request.form['password'])]) |
|
g.db.commit() |
|
flash('You were successfully registered and can login now') |
|
return redirect(url_for('login')) |
|
return render_template('register.html', error=error) |
|
|
|
|
|
@app.route('/logout') |
|
def logout(): |
|
flash('You were logged out') |
|
session.pop('user_id', None) |
|
return redirect(url_for('public_timeline')) |
|
|
|
|
|
# add some filters to jinja and set the secret key and debug mode |
|
# from the configuration. |
|
app.jinja_env.filters['datetimeformat'] = format_datetime |
|
app.jinja_env.filters['gravatar'] = gravatar_url |
|
app.secret_key = SECRET_KEY |
|
app.debug = DEBUG |
|
|
|
|
|
if __name__ == '__main__': |
|
app.run()
|
|
|