import base64 import secrets from flask import Flask from .common.common import * app_ = Flask(__name__) app_.secret_key = base64.b64decode(cf['frontend'].get('secret_key','')) or \ secrets.token_bytes(16) # development fallback; CSRF/cookies won't persist. from flask_login import LoginManager, UserMixin, current_user, login_user, logout_user, login_required # TODO: into common import sqlite3 from werkzeug.security import generate_password_hash, check_password_hash from flask import redirect, render_template, url_for login = LoginManager(app_) login.login_view = 'login_form' class User(UserMixin): # TODO: to common def __init__(self, id, name, passwd, token): self.id = id self.name = name self.passwd = passwd self.token = token def get_id(self): return self.token def set_password(self, passwd): self.passwd = generate_password_hash(passwd) # ^TODO: store changes to database def check_password(self, passwd): return check_password_hash(self.passwd, passwd) @classmethod def from_id(self, id): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT name,password,token FROM users WHERE id = ?", (id,)) try: name, passwd, token = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token) @classmethod def from_name(self, name): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT id,password,token FROM users WHERE name=?", (name,)) try: id, passwd, token = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token) @classmethod def from_token(self, token): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT id,name,password FROM users WHERE token=?", (token,)) try: id, name, passwd, = c.fetchone() except: return None # todo: ugly return User(id, name, passwd, token) @app_.route('/login') # TODO: to common def login_form(): return render_template('login.html.j2') @app_.route('/login', methods=['POST']) # TODO: to common def do_login(): action = request.form.get('action') if action == 'login': user = User.from_name(request.form.get('user')) if user and user.check_password(request.form.get('password')): login_user(user, remember=request.form.get('remember')) return redirect(url_for('youtube.index')) flash('wrong username and/or password', 'error') elif action == 'register': flash("open registration currently closed. ask girst on irc://chat.freenode.net/#invidious if you want an account.", 'info') elif action == 'logout': logout_user() return redirect(url_for('youtube.index')) else: flash('unsupported action', 'error') return redirect(url_for('login_form')) @login.user_loader def load_user(token): # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens return User.from_token(token) @login.request_loader def querytoken_auth(request): if request.args.get('token'): return User.from_token(request.args.get('token')) return None import app.youtube app_.register_blueprint(youtube.frontend) # TODO: build a proper flask extension # Magic CSRF protection: This modifies outgoing HTML responses and injects a csrf token into all forms. # All post requests are then checked if they contain the valid token. # TODO: # - don't use regex for injecting # - inject a http header into all responses (that could be used by apis) # - allow csrf token to be passed in http header, json, ... # - a decorator on routes to opt out of verification or output munging # https://stackoverflow.com/questions/19574694/flask-hit-decorator-before-before-request-signal-fires # - allow specifying hmac message contents (currently request.remote_addr) import re import hmac import hashlib from flask import request @app_.after_request def add_csrf_protection(response): if response.mimetype == "text/html": token = hmac.new(app_.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() # TODO: will fail behind reverse proxy (remote_addr always localhost) response.set_data( re.sub( rb'''(<[Ff][Oo][Rr][Mm](\s+[a-zA-Z0-9-]+(=(\w*|'[^']*'|"[^"]*"))?)*>)''', # match form tags with any number of attributes and any type of quotes rb'\1', # hackily append a hidden input with our csrf protection value response.get_data())) return response @app_.before_request def verify_csrf_protection(): token = hmac.new(app_.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() # TODO: will fail behind reverse proxy (remote_addr always localhost) if request.method == "POST" and request.form.get('csrf') != token: return "CSRF validation failed!", 400 request.form = request.form.copy() # make it mutable request.form.poplist('csrf') # remove our csrf again