import base64 import secrets import importlib from flask import Flask from .common.common import * from .common.user import init_login app = Flask(__name__) app.secret_key = base64.b64decode(cf['frontend'].get('secret_key','')) or \ secrets.token_bytes(16) # development fallback; CSRF/cookies won't persist. init_login(app) for name in cf['frontend']['modules'].split(','): blueprint = importlib.import_module('.'+name, __name__) app.register_blueprint(blueprint.frontend) # TODO: move this somewhere else @app.template_global() def querystring_page(fields): tmp = dict(request.args) for field,what in fields.items(): if type(what) is tuple: (plusminus, default) = what tmp[field] = int(tmp.get(field, str(default))) + plusminus elif type(what) is type(None): if field in tmp: del tmp[field] else: tmp[field] = what from werkzeug.urls import url_encode return url_encode(tmp) # TODO: should this go somewhere else? # This error handler logs requests to external apis, and POST data. this makes debugging of api responses easier, as the request can be reconstructed and replayed. from flask import g, request from werkzeug.exceptions import InternalServerError @app.errorhandler(InternalServerError) def log_errors(e): if request.method == "POST": app.logger.error(request.data) if 'api_requests' in g: app.logger.error(g.api_requests) return e # TODO: build a proper flask extension # Magic CSRF protection: This modifies outgoing HTML responses and injects a csrf token into all forms. # All post requests are then checked if they contain the valid token. # TODO: # - knobs: mimetypes, http methods, form field name, token generator # - inject a http header into all responses (that could be used by apis) # - allow csrf token to be passed in http header, json, ... # - a decorator on routes to opt out of verification or output munging # https://stackoverflow.com/questions/19574694/flask-hit-decorator-before-before-request-signal-fires # - allow specifying hmac message contents (currently request.remote_addr) import hmac import hashlib from flask import request from werkzeug.exceptions import BadRequest from html.parser import HTMLParser @app.template_global() def csrf_token(): # TODO: will fail behind reverse proxy (remote_addr always localhost) return hmac.new(app.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() @app.after_request def add_csrf_protection(response): if response.mimetype == "text/html": csrf_elem = f'' new_response = add_csrf(response.get_data().decode('utf-8'), csrf_elem) response.set_data(new_response.encode('utf-8')) return response @app.before_request def verify_csrf_protection(): if request.method == "POST" and request.form.get('csrf') != csrf_token(): raise BadRequest("CSRF validation failed") request.form = request.form.copy() # make it mutable request.form.poplist('csrf') # remove our csrf again def add_csrf(html_in, csrf_elem): class FindForms(HTMLParser): def __init__(self, html): super().__init__() self.forms = [] # tuples of (line_number, tag_offset, tag_length) super().feed(html) def handle_starttag(self, tag, attrs): line, offset = self.getpos() if tag == "form" and dict(attrs).get('method','').upper() == "POST": self.forms.append((line, offset, len(self.get_starttag_text()))) lines = html_in.splitlines(keepends=True) # Note: going in reverse, to not invalidate offsets: for line, offset, length in reversed(FindForms(html_in).forms): l = lines[line-1] lines[line-1] = l[:offset+length] + csrf_elem + l[offset+length:] return "".join(lines)