]> git.gir.st - subscriptionfeed.git/blob - app/common/user.py
[DATABASE CHANGE: Migration below] allow setting user settings from profile page
[subscriptionfeed.git] / app / common / user.py
1 from werkzeug.security import generate_password_hash, check_password_hash
2 from .common import cf
3 import sqlite3
4 import secrets
5 import json
6 from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
7 from flask import Blueprint, flash, redirect, render_template, url_for, request
8
9 class User(UserMixin): # TODO: to common
10 def __init__(self, id, name, passwd, token, is_admin):
11 self.id = id
12 self.name = name
13 self.passwd = passwd
14 self.token = token
15 self.admin = is_admin
16 def get_id(self):
17 return self.id
18 def set_password(self, passwd):
19 self.passwd = generate_password_hash(passwd)
20 with sqlite3.connect(cf['global']['database']) as conn:
21 c = conn.cursor()
22 c.execute("UPDATE users SET password = ? where id = ?", (self.passwd, self.id,))
23 def check_password(self, passwd):
24 return check_password_hash(self.passwd, passwd)
25 @classmethod
26 def from_id(self, id):
27 with sqlite3.connect(cf['global']['database']) as conn:
28 c = conn.cursor()
29 c.execute("SELECT name,password,token,is_admin FROM users WHERE id = ?", (id,))
30 try:
31 name, passwd, token, admin = c.fetchone()
32 except: return None # todo: ugly
33 return User(id, name, passwd, token, admin)
34 @classmethod
35 def from_name(self, name):
36 with sqlite3.connect(cf['global']['database']) as conn:
37 c = conn.cursor()
38 c.execute("SELECT id,password,token,is_admin FROM users WHERE name=?", (name,))
39 try:
40 id, passwd, token, admin = c.fetchone()
41 except: return None # todo: ugly
42 return User(id, name, passwd, token, admin)
43 @classmethod
44 def from_token(self, login_token):
45 # Note: this function reads the revocable token, not the internal one!
46 with sqlite3.connect(cf['global']['database']) as conn:
47 c = conn.cursor()
48 c.execute("""
49 SELECT id, name, password, users.token, is_admin
50 FROM users JOIN user_tokens ON users.id = user_tokens.user_id
51 WHERE user_tokens.token = ?
52 """, (login_token,))
53 try:
54 id, name, passwd, token, admin = c.fetchone()
55 return User(id, name, passwd, token, admin)
56 except:
57 return None
58
59
60 def init_login(app):
61 login = LoginManager()
62 login.login_view = 'usermgmt.login_form'
63 login.init_app(app)
64
65 @login.user_loader
66 def load_user(id):
67 # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
68 return User.from_id(id)
69
70 @login.request_loader
71 def querytoken_auth(request):
72 if request.args.get('token'):
73 user = User.from_token(request.args.get('token'))
74 if user:
75 login_user(user)
76 return user
77 return None
78
79 usermgmt = Blueprint('usermgmt', __name__,
80 template_folder='templates',
81 static_folder='static',
82 static_url_path='/static/usermgmt')
83
84 @usermgmt.route('/login')
85 def login_form():
86 return render_template('login.html.j2')
87
88 @usermgmt.route('/login', methods=['POST'])
89 def do_login():
90 action = request.form.get('action')
91 if action == 'login':
92 user = User.from_name(request.form.get('user'))
93 if user and user.check_password(request.form.get('password')):
94 login_user(user, remember=request.form.get('remember'))
95 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
96 flash('wrong username and/or password', 'error')
97 elif action == 'register':
98 flash("open registration currently closed. ask <i>girst</i> on irc://irc.libera.chat/#invidious if you want an account.", 'info')
99 elif action == 'logout':
100 logout_user()
101 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
102 else:
103 flash('unsupported action', 'error')
104 return redirect(url_for('usermgmt.login_form'))
105
106 @usermgmt.route('/manage/account')
107 @login_required
108 def account_manager():
109 with sqlite3.connect(cf['global']['database']) as conn:
110 c = conn.cursor()
111 c.execute("""
112 SELECT setting, value
113 FROM user_settings
114 WHERE user_id = ?
115 """, (current_user.id,))
116 result = c.fetchall()
117 settings = {
118 setting: json.loads(value)
119 for setting, value in result
120 }
121 c.execute("""
122 SELECT token
123 FROM user_tokens
124 WHERE user_id = ?
125 """, (current_user.id,))
126 result = c.fetchone()
127 if result:
128 (login_token,) = result
129 else:
130 login_token = ""
131 return render_template('account_mgmt.html.j2', settings=settings, login_token=login_token, random_pwd=secrets.token_hex(16))
132
133 @usermgmt.route('/manage/account', methods=['POST'])
134 @login_required
135 def manage_account():
136 token = current_user.token
137 action = request.form.get('action')
138 if action == 'chpwd':
139 if not current_user.check_password(request.form.get('oldpasswd')):
140 flash('current password incorrect.', 'error')
141 else:
142 current_user.set_password(request.form.get('newpasswd'))
143 flash('password updated.', 'info')
144 elif action == 'chtok':
145 with sqlite3.connect(cf['global']['database']) as conn:
146 new_token = secrets.token_urlsafe(16)
147 c = conn.cursor()
148 c.execute("""
149 INSERT OR REPLACE INTO user_tokens (user_id, token)
150 VALUES (?, ?)
151 """, (current_user.id, new_token))
152 flash('new token generated.', 'info')
153 elif action == 'chset':
154 with sqlite3.connect(cf['global']['database']) as conn:
155 noshorts = request.form.get('noshorts') == 'yes'
156 c = conn.cursor()
157 c.execute("""
158 INSERT OR REPLACE INTO user_settings (user_id, setting, value)
159 VALUES (?, ?, ?)
160 """, (current_user.id, "noshorts", json.dumps(noshorts)))
161 flash('settings saved.', 'info')
162 elif action == 'addusr':
163 if not current_user.admin:
164 return "only admins may do that!", 403
165 with sqlite3.connect(cf['global']['database']) as conn:
166 new_token = secrets.token_urlsafe(16)
167 username = request.form.get('user')
168 password = request.form.get('pass')
169 password = generate_password_hash(password)
170 is_admin = request.form.get('admin') == 'yes'
171 c = conn.cursor()
172 try:
173 c.execute("""
174 INSERT INTO users (name, password, is_admin, token)
175 VALUES (?, ?, ?, ?)
176 """, (username, password, is_admin, new_token));
177 flash('new user created.', 'info')
178 except sqlite3.DatabaseError as e:
179 flash('error creating user: {e}', 'error')
180 else:
181 flash('unsupported action', 'error')
182
183 return redirect(url_for('usermgmt.account_manager'))
184
185 # NOTE: only register blueprint _after_ adding routes!
186 app.register_blueprint(usermgmt)
Imprint / Impressum