]> git.gir.st - subscriptionfeed.git/blob - app/common/user.py
use continuation token instead of manually paginated results
[subscriptionfeed.git] / app / common / user.py
1 from werkzeug.security import generate_password_hash, check_password_hash
2 from .common import cf
3 import sqlite3
4 import secrets
5 from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
6 from flask import Blueprint, flash, redirect, render_template, url_for, request
7
8 class User(UserMixin): # TODO: to common
9 def __init__(self, id, name, passwd, token, is_admin):
10 self.id = id
11 self.name = name
12 self.passwd = passwd
13 self.token = token
14 self.admin = is_admin
15 def get_id(self):
16 return self.id
17 def set_password(self, passwd):
18 self.passwd = generate_password_hash(passwd)
19 with sqlite3.connect(cf['global']['database']) as conn:
20 c = conn.cursor()
21 c.execute("UPDATE users SET password = ? where id = ?", (self.passwd, self.id,))
22 def check_password(self, passwd):
23 return check_password_hash(self.passwd, passwd)
24 @classmethod
25 def from_id(self, id):
26 with sqlite3.connect(cf['global']['database']) as conn:
27 c = conn.cursor()
28 c.execute("SELECT name,password,token,is_admin FROM users WHERE id = ?", (id,))
29 try:
30 name, passwd, token, admin = c.fetchone()
31 except: return None # todo: ugly
32 return User(id, name, passwd, token, admin)
33 @classmethod
34 def from_name(self, name):
35 with sqlite3.connect(cf['global']['database']) as conn:
36 c = conn.cursor()
37 c.execute("SELECT id,password,token,is_admin FROM users WHERE name=?", (name,))
38 try:
39 id, passwd, token, admin = c.fetchone()
40 except: return None # todo: ugly
41 return User(id, name, passwd, token, admin)
42 @classmethod
43 def from_token(self, login_token):
44 # Note: this function reads the revocable token, not the internal one!
45 with sqlite3.connect(cf['global']['database']) as conn:
46 c = conn.cursor()
47 c.execute("""
48 SELECT id, name, password, users.token, is_admin
49 FROM users JOIN user_tokens ON users.id = user_tokens.user_id
50 WHERE user_tokens.token = ?
51 """, (login_token,))
52 try:
53 id, name, passwd, token, admin = c.fetchone()
54 return User(id, name, passwd, token, admin)
55 except:
56 return None
57
58
59 def init_login(app):
60 login = LoginManager()
61 login.login_view = 'usermgmt.login_form'
62 login.init_app(app)
63
64 @login.user_loader
65 def load_user(id):
66 # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
67 return User.from_id(id)
68
69 @login.request_loader
70 def querytoken_auth(request):
71 if request.args.get('token'):
72 user = User.from_token(request.args.get('token'))
73 if user:
74 login_user(user)
75 return user
76 return None
77
78 usermgmt = Blueprint('usermgmt', __name__,
79 template_folder='templates',
80 static_folder='static',
81 static_url_path='/static/usermgmt')
82
83 @usermgmt.route('/login')
84 def login_form():
85 return render_template('login.html.j2')
86
87 @usermgmt.route('/login', methods=['POST'])
88 def do_login():
89 action = request.form.get('action')
90 if action == 'login':
91 user = User.from_name(request.form.get('user'))
92 if user and user.check_password(request.form.get('password')):
93 login_user(user, remember=request.form.get('remember'))
94 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
95 flash('wrong username and/or password', 'error')
96 elif action == 'register':
97 flash("open registration currently closed. ask <i>girst</i> on irc://irc.libera.chat/#invidious if you want an account.", 'info')
98 elif action == 'logout':
99 logout_user()
100 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
101 else:
102 flash('unsupported action', 'error')
103 return redirect(url_for('usermgmt.login_form'))
104
105 @usermgmt.route('/manage/account')
106 @login_required
107 def account_manager():
108 with sqlite3.connect(cf['global']['database']) as conn:
109 c = conn.cursor()
110 c.execute("""
111 SELECT token
112 FROM user_tokens
113 WHERE user_id = ?
114 """, (current_user.id,))
115 result = c.fetchone()
116 if result:
117 (login_token,) = result
118 else:
119 login_token = ""
120 return render_template('account_mgmt.html.j2', login_token=login_token, random_pwd=secrets.token_hex(16))
121
122 @usermgmt.route('/manage/account', methods=['POST'])
123 @login_required
124 def manage_account():
125 token = current_user.token
126 action = request.form.get('action')
127 if action == 'chpwd':
128 if not current_user.check_password(request.form.get('oldpasswd')):
129 flash('current password incorrect.', 'error')
130 else:
131 current_user.set_password(request.form.get('newpasswd'))
132 flash('password updated.', 'info')
133 elif action == 'chtok':
134 with sqlite3.connect(cf['global']['database']) as conn:
135 new_token = secrets.token_urlsafe(16)
136 c = conn.cursor()
137 c.execute("""
138 INSERT OR REPLACE INTO user_tokens (user_id, token)
139 VALUES (?, ?)
140 """, (current_user.id, new_token))
141 flash('new token generated.', 'info')
142 elif action == 'addusr':
143 if not current_user.admin:
144 return "only admins may do that!", 403
145 with sqlite3.connect(cf['global']['database']) as conn:
146 new_token = secrets.token_urlsafe(16)
147 username = request.form.get('user')
148 password = request.form.get('pass')
149 password = generate_password_hash(password)
150 is_admin = request.form.get('admin') == 'yes'
151 c = conn.cursor()
152 try:
153 c.execute("""
154 INSERT INTO users (name, password, is_admin, token)
155 VALUES (?, ?, ?, ?)
156 """, (username, password, is_admin, new_token));
157 flash('new user created.', 'info')
158 except sqlite3.DatabaseError as e:
159 flash('error creating user: {e}', 'error')
160 else:
161 flash('unsupported action', 'error')
162
163 return redirect(url_for('usermgmt.account_manager'))
164
165 # NOTE: only register blueprint _after_ adding routes!
166 app.register_blueprint(usermgmt)
Imprint / Impressum