import base64
import secrets
from flask import Flask
from .common.common import *
app_ = Flask(__name__)
app_.secret_key = base64.b64decode(cf['frontend'].get('secret_key','')) or \
secrets.token_bytes(16) # development fallback; CSRF/cookies won't persist.
from flask_login import LoginManager, UserMixin, current_user, login_user, logout_user, login_required # TODO: into common
import sqlite3
from werkzeug.security import generate_password_hash, check_password_hash
from flask import redirect, render_template, url_for
login = LoginManager(app_)
login.login_view = 'login_form'
class User(UserMixin): # TODO: to common
def __init__(self, id, name, passwd, token):
self.id = id
self.name = name
self.passwd = passwd
self.token = token
def get_id(self):
return self.token
def set_password(self, passwd):
self.passwd = generate_password_hash(passwd)
# ^TODO: store changes to database
def check_password(self, passwd):
return check_password_hash(self.passwd, passwd)
@classmethod
def from_id(self, id):
with sqlite3.connect(cf['global']['database']) as conn:
c = conn.cursor()
c.execute("SELECT name,password,token FROM users WHERE id = ?", (id,))
try:
name, passwd, token = c.fetchone()
except: return None # todo: ugly
return User(id, name, passwd, token)
@classmethod
def from_name(self, name):
with sqlite3.connect(cf['global']['database']) as conn:
c = conn.cursor()
c.execute("SELECT id,password,token FROM users WHERE name=?", (name,))
try:
id, passwd, token = c.fetchone()
except: return None # todo: ugly
return User(id, name, passwd, token)
@classmethod
def from_token(self, token):
with sqlite3.connect(cf['global']['database']) as conn:
c = conn.cursor()
c.execute("SELECT id,name,password FROM users WHERE token=?", (token,))
try:
id, name, passwd, = c.fetchone()
except: return None # todo: ugly
return User(id, name, passwd, token)
@app_.route('/login') # TODO: to common
def login_form():
return render_template('login.html.j2')
@app_.route('/login', methods=['POST']) # TODO: to common
def do_login():
action = request.form.get('action')
if action == 'login':
user = User.from_name(request.form.get('user'))
if user and user.check_password(request.form.get('password')):
login_user(user, remember=request.form.get('remember'))
return redirect(url_for('youtube.index'))
flash('wrong username and/or password', 'error')
elif action == 'register':
flash("open registration currently closed. ask girst on irc://chat.freenode.net/#invidious if you want an account.", 'info')
elif action == 'logout':
logout_user()
return redirect(url_for('youtube.index'))
else:
flash('unsupported action', 'error')
return redirect(url_for('login_form'))
@login.user_loader
def load_user(token):
# in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
return User.from_token(token)
@login.request_loader
def querytoken_auth(request):
if request.args.get('token'):
return User.from_token(request.args.get('token'))
return None
import app.youtube
app_.register_blueprint(youtube.frontend)
# TODO: build a proper flask extension
# Magic CSRF protection: This modifies outgoing HTML responses and injects a csrf token into all forms.
# All post requests are then checked if they contain the valid token.
# TODO:
# - don't use regex for injecting
# - inject a http header into all responses (that could be used by apis)
# - allow csrf token to be passed in http header, json, ...
# - a decorator on routes to opt out of verification or output munging
# https://stackoverflow.com/questions/19574694/flask-hit-decorator-before-before-request-signal-fires
# - allow specifying hmac message contents (currently request.remote_addr)
import re
import hmac
import hashlib
from flask import request
@app_.after_request
def add_csrf_protection(response):
if response.mimetype == "text/html":
token = hmac.new(app_.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() # TODO: will fail behind reverse proxy (remote_addr always localhost)
response.set_data( re.sub(
rb'''(<[Ff][Oo][Rr][Mm](\s+[a-zA-Z0-9-]+(=(\w*|'[^']*'|"[^"]*"))?)*>)''', # match form tags with any number of attributes and any type of quotes
rb'\1', # hackily append a hidden input with our csrf protection value
response.get_data()))
return response
@app_.before_request
def verify_csrf_protection():
token = hmac.new(app_.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() # TODO: will fail behind reverse proxy (remote_addr always localhost)
if request.method == "POST" and request.form.get('csrf') != token:
return "CSRF validation failed!", 400
request.form = request.form.copy() # make it mutable
request.form.poplist('csrf') # remove our csrf again