from werkzeug.security import generate_password_hash, check_password_hash from .common import cf import sqlite3 import secrets import json from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from flask import Blueprint, flash, redirect, render_template, url_for, request class User(UserMixin): # TODO: to common def __init__(self, id, name, passwd, token, is_admin): self.id = id self.name = name self.passwd = passwd self.token = token self.admin = is_admin def get_id(self): return self.id def set_password(self, passwd): self.passwd = generate_password_hash(passwd) with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("UPDATE users SET password = ? where id = ?", (self.passwd, self.id,)) def check_password(self, passwd): return check_password_hash(self.passwd, passwd) @classmethod def from_id(self, id): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT name,password,token,is_admin FROM users WHERE id = ?", (id,)) try: name, passwd, token, admin = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token, admin) @classmethod def from_name(self, name): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT id,password,token,is_admin FROM users WHERE name=?", (name,)) try: id, passwd, token, admin = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token, admin) @classmethod def from_token(self, login_token): # Note: this function reads the revocable token, not the internal one! with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT id, name, password, users.token, is_admin FROM users JOIN user_tokens ON users.id = user_tokens.user_id WHERE user_tokens.token = ? """, (login_token,)) try: id, name, passwd, token, admin = c.fetchone() return User(id, name, passwd, token, admin) except: return None def init_login(app): login = LoginManager() login.login_view = 'usermgmt.login_form' login.init_app(app) @login.user_loader def load_user(id): # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens return User.from_id(id) @login.request_loader def querytoken_auth(request): if request.args.get('token'): user = User.from_token(request.args.get('token')) if user: login_user(user) return user return None usermgmt = Blueprint('usermgmt', __name__, template_folder='templates', static_folder='static', static_url_path='/static/usermgmt') @usermgmt.route('/login') def login_form(): return render_template('login.html.j2') @usermgmt.route('/login', methods=['POST']) def do_login(): action = request.form.get('action') if action == 'login': user = User.from_name(request.form.get('user')) if user and user.check_password(request.form.get('password')): login_user(user, remember=request.form.get('remember')) return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect! flash('wrong username and/or password', 'error') elif action == 'register': flash("open registration currently closed. ask girst on irc://irc.libera.chat/#invidious if you want an account.", 'info') elif action == 'logout': logout_user() return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect! else: flash('unsupported action', 'error') return redirect(url_for('usermgmt.login_form')) @usermgmt.route('/manage/account') @login_required def account_manager(): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT setting, value FROM user_settings WHERE user_id = ? """, (current_user.id,)) result = c.fetchall() settings = { setting: json.loads(value) for setting, value in result } c.execute(""" SELECT token FROM user_tokens WHERE user_id = ? """, (current_user.id,)) result = c.fetchone() if result: (login_token,) = result else: login_token = "" return render_template('account_mgmt.html.j2', settings=settings, login_token=login_token, random_pwd=secrets.token_hex(16)) @usermgmt.route('/manage/account', methods=['POST']) @login_required def manage_account(): token = current_user.token action = request.form.get('action') if action == 'chpwd': if not current_user.check_password(request.form.get('oldpasswd')): flash('current password incorrect.', 'error') else: current_user.set_password(request.form.get('newpasswd')) flash('password updated.', 'info') elif action == 'chtok': with sqlite3.connect(cf['global']['database']) as conn: new_token = secrets.token_urlsafe(16) c = conn.cursor() c.execute(""" INSERT OR REPLACE INTO user_tokens (user_id, token) VALUES (?, ?) """, (current_user.id, new_token)) flash('new token generated.', 'info') elif action == 'chset': with sqlite3.connect(cf['global']['database']) as conn: noshorts = request.form.get('noshorts') == 'yes' c = conn.cursor() c.execute(""" INSERT OR REPLACE INTO user_settings (user_id, setting, value) VALUES (?, ?, ?) """, (current_user.id, "noshorts", json.dumps(noshorts))) flash('settings saved.', 'info') elif action == 'addusr': if not current_user.admin: return "only admins may do that!", 403 with sqlite3.connect(cf['global']['database']) as conn: new_token = secrets.token_urlsafe(16) username = request.form.get('user') password = request.form.get('pass') password = generate_password_hash(password) is_admin = request.form.get('admin') == 'yes' c = conn.cursor() try: c.execute(""" INSERT INTO users (name, password, is_admin, token) VALUES (?, ?, ?, ?) """, (username, password, is_admin, new_token)); flash('new user created.', 'info') except sqlite3.DatabaseError as e: flash('error creating user: {e}', 'error') else: flash('unsupported action', 'error') return redirect(url_for('usermgmt.account_manager')) # NOTE: only register blueprint _after_ adding routes! app.register_blueprint(usermgmt)