From ef96a6f2032c3b7c8ba553f466c1fba60c598aaa Mon Sep 17 00:00:00 2001 From: girst Date: Fri, 5 Mar 2021 16:47:56 +0100 Subject: [PATCH] move anticsrf out of __init__, provide decorator for opting out this also fixes running the webhooks blueprint on the same flask instance as the frontend, which couldn't have worked before this. oops! --- app/__init__.py | 50 ++------------------------------ app/common/anticsrf.py | 62 ++++++++++++++++++++++++++++++++++++++++ app/webhooks/__init__.py | 2 ++ 3 files changed, 66 insertions(+), 48 deletions(-) create mode 100644 app/common/anticsrf.py diff --git a/app/__init__.py b/app/__init__.py index 343ba1e..78de744 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -42,51 +42,5 @@ def log_errors(e): app.logger.error(g.api_requests) return e -# TODO: build a proper flask extension -# Magic CSRF protection: This modifies outgoing HTML responses and injects a csrf token into all forms. -# All post requests are then checked if they contain the valid token. -# TODO: -# - knobs: mimetypes, http methods, form field name, token generator -# - inject a http header into all responses (that could be used by apis) -# - allow csrf token to be passed in http header, json, ... -# - a decorator on routes to opt out of verification or output munging -# https://stackoverflow.com/questions/19574694/flask-hit-decorator-before-before-request-signal-fires -# - allow specifying hmac message contents (currently request.remote_addr) -import hmac -import hashlib -from flask import request -from werkzeug.exceptions import BadRequest -from html.parser import HTMLParser -@app.template_global() -def csrf_token(): - # TODO: will fail behind reverse proxy (remote_addr always localhost) - return hmac.new(app.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() -@app.after_request -def add_csrf_protection(response): - if response.mimetype == "text/html": - csrf_elem = f'' - new_response = add_csrf(response.get_data().decode('utf-8'), csrf_elem) - response.set_data(new_response.encode('utf-8')) - return response -@app.before_request -def verify_csrf_protection(): - if request.method == "POST" and request.form.get('csrf') != csrf_token(): - raise BadRequest("CSRF validation failed") - request.form = request.form.copy() # make it mutable - request.form.poplist('csrf') # remove our csrf again -def add_csrf(html_in, csrf_elem): - class FindForms(HTMLParser): - def __init__(self, html): - super().__init__() - self.forms = [] # tuples of (line_number, tag_offset, tag_length) - super().feed(html) - def handle_starttag(self, tag, attrs): - line, offset = self.getpos() - if tag == "form" and dict(attrs).get('method','').upper() == "POST": - self.forms.append((line, offset, len(self.get_starttag_text()))) - lines = html_in.splitlines(keepends=True) - # Note: going in reverse, to not invalidate offsets: - for line, offset, length in reversed(FindForms(html_in).forms): - l = lines[line-1] - lines[line-1] = l[:offset+length] + csrf_elem + l[offset+length:] - return "".join(lines) +from .common import anticsrf +anticsrf.init(app) diff --git a/app/common/anticsrf.py b/app/common/anticsrf.py new file mode 100644 index 0000000..a102c87 --- /dev/null +++ b/app/common/anticsrf.py @@ -0,0 +1,62 @@ +# TODO: build a proper flask extension +# Magic CSRF protection: This modifies outgoing HTML responses and injects a csrf token into all forms. +# All post requests are then checked if they contain the valid token. +# TODO: +# - knobs: mimetypes, http methods, form field name, token generator +# - inject a http header into all responses (that could be used by apis) +# - allow csrf token to be passed in http header, json, ... +# - allow specifying hmac message contents (currently request.remote_addr) +import hmac +import hashlib +from flask import request, current_app +from werkzeug.exceptions import BadRequest +from html.parser import HTMLParser + +def init(app): + app.template_global(csrf_token) + app.after_request(add_csrf_protection) + app.before_request(verify_csrf_protection) + +def no_csrf_protection(func): + # add this decorator below @app.route + func._no_csrf_protection = True + return func + +def csrf_token(): + # TODO: will fail behind reverse proxy (remote_addr always localhost) + return hmac.new(current_app.secret_key, request.remote_addr.encode('ascii'), hashlib.sha256).hexdigest() + +def add_csrf_protection(response): + if response.mimetype == "text/html": + csrf_elem = f'' + new_response = add_csrf(response.get_data().decode('utf-8'), csrf_elem) + response.set_data(new_response.encode('utf-8')) + return response + +def verify_csrf_protection(): + skip = getattr(current_app.view_functions.get(request.endpoint), '_no_csrf_protection', False) + #^xxx: doesn't take fallback_routes into account! + if skip: return + + if request.method == "POST" and request.form.get('csrf') != csrf_token(): + raise BadRequest("CSRF validation failed") + + request.form = request.form.copy() # make it mutable + request.form.poplist('csrf') # remove our csrf again + +def add_csrf(html_in, csrf_elem): + class FindForms(HTMLParser): + def __init__(self, html): + super().__init__() + self.forms = [] # tuples of (line_number, tag_offset, tag_length) + super().feed(html) + def handle_starttag(self, tag, attrs): + line, offset = self.getpos() + if tag == "form" and dict(attrs).get('method','').upper() == "POST": + self.forms.append((line, offset, len(self.get_starttag_text()))) + lines = html_in.splitlines(keepends=True) + # Note: going in reverse, to not invalidate offsets: + for line, offset, length in reversed(FindForms(html_in).forms): + l = lines[line-1] + lines[line-1] = l[:offset+length] + csrf_elem + l[offset+length:] + return "".join(lines) diff --git a/app/webhooks/__init__.py b/app/webhooks/__init__.py index 0a951d6..8fcaef8 100755 --- a/app/webhooks/__init__.py +++ b/app/webhooks/__init__.py @@ -10,6 +10,7 @@ from flask import Flask, Blueprint, request from urllib.parse import parse_qs, urlparse from ..common.common import * +from ..common.anticsrf import no_csrf_protection frontend = Blueprint('webhooks', __name__) @@ -49,6 +50,7 @@ def websub(timestamp, nonce, subject, sig): return challenge, 200 @frontend.route('/websub/v1////', methods=["POST"]) +@no_csrf_protection def websub_post(timestamp, nonce, subject, sig): lease = cf['websub']['lease'] hmackey = cf['websub']['hmac_key'] -- 2.39.3