from werkzeug.security import generate_password_hash, check_password_hash from .common import cf import sqlite3 from flask_login import LoginManager, UserMixin, login_user, logout_user from flask import Blueprint, flash, redirect, render_template, url_for, request class User(UserMixin): # TODO: to common def __init__(self, id, name, passwd, token): self.id = id self.name = name self.passwd = passwd self.token = token def get_id(self): return self.token def set_password(self, passwd): self.passwd = generate_password_hash(passwd) # ^TODO: store changes to database def check_password(self, passwd): return check_password_hash(self.passwd, passwd) @classmethod def from_id(self, id): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT name,password,token FROM users WHERE id = ?", (id,)) try: name, passwd, token = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token) @classmethod def from_name(self, name): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT id,password,token FROM users WHERE name=?", (name,)) try: id, passwd, token = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token) @classmethod def from_token(self, token): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT id,name,password FROM users WHERE token=?", (token,)) try: id, name, passwd, = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token) def init_login(app): login = LoginManager() login.login_view = 'usermgmt.login_form' login.init_app(app) @login.user_loader def load_user(token): # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens return User.from_token(token) @login.request_loader def querytoken_auth(request): if request.args.get('token'): user = User.from_token(request.args.get('token')) if user: login_user(user) return user return None usermgmt = Blueprint('usermgmt', __name__, template_folder='templates', static_folder='static', static_url_path='/static/usermgmt') @usermgmt.route('/login') def login_form(): return render_template('login.html.j2') @usermgmt.route('/login', methods=['POST']) def do_login(): action = request.form.get('action') if action == 'login': user = User.from_name(request.form.get('user')) if user and user.check_password(request.form.get('password')): login_user(user, remember=request.form.get('remember')) return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect! flash('wrong username and/or password', 'error') elif action == 'register': flash("open registration currently closed. ask girst on irc://chat.freenode.net/#invidious if you want an account.", 'info') elif action == 'logout': logout_user() return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect! else: flash('unsupported action', 'error') return redirect(url_for('usermgmt.login_form')) # NOTE: only register blueprint _after_ adding routes! app.register_blueprint(usermgmt)