]> git.gir.st - subscriptionfeed.git/blob - app/common/user.py
support (hiding) youtube shorts everywhere
[subscriptionfeed.git] / app / common / user.py
1 from werkzeug.security import generate_password_hash, check_password_hash
2 from .common import cf
3 import sqlite3
4 import secrets
5 import json
6 from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
7 from flask import Blueprint, flash, redirect, render_template, url_for, request
8
9 class User(UserMixin): # TODO: to common
10 def __init__(self, id, name, passwd, token, is_admin):
11 self.id = id
12 self.name = name
13 self.passwd = passwd
14 self.token = token
15 self.admin = is_admin
16 def get_id(self):
17 return self.id
18 def set_password(self, passwd):
19 self.passwd = generate_password_hash(passwd)
20 with sqlite3.connect(cf['global']['database']) as conn:
21 c = conn.cursor()
22 c.execute("UPDATE users SET password = ? where id = ?", (self.passwd, self.id,))
23 def check_password(self, passwd):
24 return check_password_hash(self.passwd, passwd)
25 def get_settings(self):
26 settings = {} # fallback for guest user
27 if self.is_authenticated:
28 with sqlite3.connect(cf['global']['database']) as conn:
29 c = conn.cursor()
30 c.execute("""
31 SELECT setting, value
32 FROM user_settings
33 WHERE user_id = ?
34 """, (self.id,))
35 settings = {
36 setting: json.loads(value)
37 for setting, value in c.fetchall()
38 }
39 return settings
40 @classmethod
41 def from_id(self, id):
42 with sqlite3.connect(cf['global']['database']) as conn:
43 c = conn.cursor()
44 c.execute("SELECT name,password,token,is_admin FROM users WHERE id = ?", (id,))
45 try:
46 name, passwd, token, admin = c.fetchone()
47 except: return None # todo: ugly
48 return User(id, name, passwd, token, admin)
49 @classmethod
50 def from_name(self, name):
51 with sqlite3.connect(cf['global']['database']) as conn:
52 c = conn.cursor()
53 c.execute("SELECT id,password,token,is_admin FROM users WHERE name=?", (name,))
54 try:
55 id, passwd, token, admin = c.fetchone()
56 except: return None # todo: ugly
57 return User(id, name, passwd, token, admin)
58 @classmethod
59 def from_token(self, login_token):
60 # Note: this function reads the revocable token, not the internal one!
61 with sqlite3.connect(cf['global']['database']) as conn:
62 c = conn.cursor()
63 c.execute("""
64 SELECT id, name, password, users.token, is_admin
65 FROM users JOIN user_tokens ON users.id = user_tokens.user_id
66 WHERE user_tokens.token = ?
67 """, (login_token,))
68 try:
69 id, name, passwd, token, admin = c.fetchone()
70 return User(id, name, passwd, token, admin)
71 except:
72 return None
73
74
75 def init_login(app):
76 login = LoginManager()
77 login.login_view = 'usermgmt.login_form'
78 login.init_app(app)
79
80 @login.user_loader
81 def load_user(id):
82 # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
83 return User.from_id(id)
84
85 @login.request_loader
86 def querytoken_auth(request):
87 if request.args.get('token'):
88 user = User.from_token(request.args.get('token'))
89 if user:
90 login_user(user)
91 return user
92 return None
93
94 usermgmt = Blueprint('usermgmt', __name__,
95 template_folder='templates',
96 static_folder='static',
97 static_url_path='/static/usermgmt')
98
99 @usermgmt.route('/login')
100 def login_form():
101 return render_template('login.html.j2')
102
103 @usermgmt.route('/login', methods=['POST'])
104 def do_login():
105 action = request.form.get('action')
106 if action == 'login':
107 user = User.from_name(request.form.get('user'))
108 if user and user.check_password(request.form.get('password')):
109 login_user(user, remember=request.form.get('remember'))
110 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
111 flash('wrong username and/or password', 'error')
112 elif action == 'register':
113 flash("open registration currently closed. ask <i>girst</i> on irc://irc.libera.chat/#invidious if you want an account.", 'info')
114 elif action == 'logout':
115 logout_user()
116 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
117 else:
118 flash('unsupported action', 'error')
119 return redirect(url_for('usermgmt.login_form'))
120
121 @usermgmt.route('/manage/account')
122 @login_required
123 def account_manager():
124 with sqlite3.connect(cf['global']['database']) as conn:
125 c = conn.cursor()
126 c.execute("""
127 SELECT setting, value
128 FROM user_settings
129 WHERE user_id = ?
130 """, (current_user.id,))
131 result = c.fetchall()
132 settings = {
133 setting: json.loads(value)
134 for setting, value in result
135 }
136 c.execute("""
137 SELECT token
138 FROM user_tokens
139 WHERE user_id = ?
140 """, (current_user.id,))
141 result = c.fetchone()
142 if result:
143 (login_token,) = result
144 else:
145 login_token = ""
146 return render_template('account_mgmt.html.j2', settings=settings, login_token=login_token, random_pwd=secrets.token_hex(16))
147
148 @usermgmt.route('/manage/account', methods=['POST'])
149 @login_required
150 def manage_account():
151 token = current_user.token
152 action = request.form.get('action')
153 if action == 'chpwd':
154 if not current_user.check_password(request.form.get('oldpasswd')):
155 flash('current password incorrect.', 'error')
156 else:
157 current_user.set_password(request.form.get('newpasswd'))
158 flash('password updated.', 'info')
159 elif action == 'chtok':
160 with sqlite3.connect(cf['global']['database']) as conn:
161 new_token = secrets.token_urlsafe(16)
162 c = conn.cursor()
163 c.execute("""
164 INSERT OR REPLACE INTO user_tokens (user_id, token)
165 VALUES (?, ?)
166 """, (current_user.id, new_token))
167 flash('new token generated.', 'info')
168 elif action == 'chset':
169 with sqlite3.connect(cf['global']['database']) as conn:
170 noshorts = request.form.get('noshorts') == 'yes'
171 c = conn.cursor()
172 c.execute("""
173 INSERT OR REPLACE INTO user_settings (user_id, setting, value)
174 VALUES (?, ?, ?)
175 """, (current_user.id, "noshorts", json.dumps(noshorts)))
176 flash('settings saved.', 'info')
177 elif action == 'addusr':
178 if not current_user.admin:
179 return "only admins may do that!", 403
180 with sqlite3.connect(cf['global']['database']) as conn:
181 new_token = secrets.token_urlsafe(16)
182 username = request.form.get('user')
183 password = request.form.get('pass')
184 password = generate_password_hash(password)
185 is_admin = request.form.get('admin') == 'yes'
186 c = conn.cursor()
187 try:
188 c.execute("""
189 INSERT INTO users (name, password, is_admin, token)
190 VALUES (?, ?, ?, ?)
191 """, (username, password, is_admin, new_token));
192 flash('new user created.', 'info')
193 except sqlite3.DatabaseError as e:
194 flash('error creating user: {e}', 'error')
195 else:
196 flash('unsupported action', 'error')
197
198 return redirect(url_for('usermgmt.account_manager'))
199
200 # NOTE: only register blueprint _after_ adding routes!
201 app.register_blueprint(usermgmt)
Imprint / Impressum