]> git.gir.st - subscriptionfeed.git/blob - app/common/user.py
reddit: textarea hack does't escape quotes
[subscriptionfeed.git] / app / common / user.py
1 from werkzeug.security import generate_password_hash, check_password_hash
2 from .common import cf
3 import sqlite3
4 import secrets
5 import json
6 from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
7 from flask import Blueprint, flash, redirect, render_template, url_for, request
8
9 class User(UserMixin): # TODO: to common
10 def __init__(self, id, name, passwd, token, is_admin):
11 self.id = id
12 self.name = name
13 self.passwd = passwd
14 self.token = token
15 self.admin = is_admin
16 def get_id(self):
17 return self.id
18 def set_password(self, passwd):
19 self.passwd = generate_password_hash(passwd)
20 with sqlite3.connect(cf['global']['database']) as conn:
21 c = conn.cursor()
22 c.execute("UPDATE users SET password = ? where id = ?", (self.passwd, self.id,))
23 def check_password(self, passwd):
24 return check_password_hash(self.passwd, passwd)
25 def get_settings(self):
26 settings = {} # fallback for guest user
27 if self.is_authenticated:
28 with sqlite3.connect(cf['global']['database']) as conn:
29 c = conn.cursor()
30 c.execute("""
31 SELECT setting, value
32 FROM user_settings
33 WHERE user_id = ?
34 """, (self.id,))
35 settings = {
36 setting: json.loads(value)
37 for setting, value in c.fetchall()
38 }
39 return settings
40 @classmethod
41 def from_id(self, id):
42 with sqlite3.connect(cf['global']['database']) as conn:
43 c = conn.cursor()
44 c.execute("SELECT name,password,token,is_admin FROM users WHERE id = ?", (id,))
45 try:
46 name, passwd, token, admin = c.fetchone()
47 except: return None # todo: ugly
48 return User(id, name, passwd, token, admin)
49 @classmethod
50 def from_name(self, name):
51 with sqlite3.connect(cf['global']['database']) as conn:
52 c = conn.cursor()
53 c.execute("SELECT id,password,token,is_admin FROM users WHERE name=?", (name,))
54 try:
55 id, passwd, token, admin = c.fetchone()
56 except: return None # todo: ugly
57 return User(id, name, passwd, token, admin)
58 @classmethod
59 def from_token(self, login_token):
60 # Note: this function reads the revocable token, not the internal one!
61 with sqlite3.connect(cf['global']['database']) as conn:
62 c = conn.cursor()
63 c.execute("""
64 SELECT id, name, password, users.token, is_admin
65 FROM users JOIN user_tokens ON users.id = user_tokens.user_id
66 WHERE user_tokens.token = ?
67 """, (login_token,))
68 try:
69 id, name, passwd, token, admin = c.fetchone()
70 return User(id, name, passwd, token, admin)
71 except:
72 return None
73
74
75 def init_login(app):
76 login = LoginManager()
77 login.login_view = 'usermgmt.login_form'
78 login.init_app(app)
79
80 @login.user_loader
81 def load_user(id):
82 # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
83 return User.from_id(id)
84
85 @login.request_loader
86 def querytoken_auth(request):
87 if request.args.get('token'):
88 user = User.from_token(request.args.get('token'))
89 if user:
90 login_user(user)
91 return user
92 return None
93
94 usermgmt = Blueprint('usermgmt', __name__,
95 template_folder='templates',
96 static_folder='static',
97 static_url_path='/static/usermgmt')
98
99 @usermgmt.route('/login')
100 def login_form():
101 return render_template('login.html.j2')
102
103 @usermgmt.route('/login', methods=['POST'])
104 def do_login():
105 action = request.form.get('action')
106 nexturl = request.args.get('next','/')
107 if not nexturl.startswith('/'):
108 nexturl = '/'
109 if action == 'login':
110 user = User.from_name(request.form.get('user'))
111 if user and user.check_password(request.form.get('password')):
112 login_user(user, remember=request.form.get('remember'))
113 return redirect(nexturl)
114 flash('wrong username and/or password', 'error')
115 elif action == 'register':
116 flash("open registration currently closed. ask <i>girst</i> on irc://irc.libera.chat/#invidious if you want an account.", 'info')
117 elif action == 'logout':
118 logout_user()
119 return redirect(nexturl)
120 else:
121 flash('unsupported action', 'error')
122 return redirect(url_for('usermgmt.login_form'))
123
124 @usermgmt.route('/manage/account')
125 @login_required
126 def account_manager():
127 with sqlite3.connect(cf['global']['database']) as conn:
128 c = conn.cursor()
129 c.execute("""
130 SELECT setting, value
131 FROM user_settings
132 WHERE user_id = ?
133 """, (current_user.id,))
134 result = c.fetchall()
135 settings = {
136 setting: json.loads(value)
137 for setting, value in result
138 }
139 c.execute("""
140 SELECT token
141 FROM user_tokens
142 WHERE user_id = ?
143 """, (current_user.id,))
144 result = c.fetchone()
145 if result:
146 (login_token,) = result
147 else:
148 login_token = ""
149 return render_template('account_mgmt.html.j2', settings=settings, login_token=login_token, random_pwd=secrets.token_hex(16))
150
151 @usermgmt.route('/manage/account', methods=['POST'])
152 @login_required
153 def manage_account():
154 token = current_user.token
155 action = request.form.get('action')
156 if action == 'chpwd':
157 if not current_user.check_password(request.form.get('oldpasswd')):
158 flash('current password incorrect.', 'error')
159 else:
160 current_user.set_password(request.form.get('newpasswd'))
161 flash('password updated.', 'info')
162 elif action == 'chtok':
163 with sqlite3.connect(cf['global']['database']) as conn:
164 new_token = secrets.token_urlsafe(16)
165 c = conn.cursor()
166 c.execute("""
167 INSERT OR REPLACE INTO user_tokens (user_id, token)
168 VALUES (?, ?)
169 """, (current_user.id, new_token))
170 flash('new token generated.', 'info')
171 elif action == 'chset':
172 with sqlite3.connect(cf['global']['database']) as conn:
173 noshorts = request.form.get('noshorts') == 'yes'
174 c = conn.cursor()
175 c.execute("""
176 INSERT OR REPLACE INTO user_settings (user_id, setting, value)
177 VALUES (?, ?, ?)
178 """, (current_user.id, "noshorts", json.dumps(noshorts)))
179 flash('settings saved.', 'info')
180 elif action == 'addusr':
181 if not current_user.admin:
182 return "only admins may do that!", 403
183 with sqlite3.connect(cf['global']['database']) as conn:
184 new_token = secrets.token_urlsafe(16)
185 username = request.form.get('user')
186 password = request.form.get('pass')
187 password = generate_password_hash(password)
188 is_admin = request.form.get('admin') == 'yes'
189 c = conn.cursor()
190 try:
191 c.execute("""
192 INSERT INTO users (name, password, is_admin, token)
193 VALUES (?, ?, ?, ?)
194 """, (username, password, is_admin, new_token));
195 flash('new user created.', 'info')
196 except sqlite3.DatabaseError as e:
197 flash('error creating user: {e}', 'error')
198 else:
199 flash('unsupported action', 'error')
200
201 return redirect(url_for('usermgmt.account_manager'))
202
203 # NOTE: only register blueprint _after_ adding routes!
204 app.register_blueprint(usermgmt)
Imprint / Impressum