]> git.gir.st - subscriptionfeed.git/blob - app/common/user.py
s/freenode/libera.chat/g
[subscriptionfeed.git] / app / common / user.py
1 from werkzeug.security import generate_password_hash, check_password_hash
2 from .common import cf
3 import sqlite3
4 import secrets
5 from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
6 from flask import Blueprint, flash, redirect, render_template, url_for, request
7
8 class User(UserMixin): # TODO: to common
9 def __init__(self, id, name, passwd, token):
10 self.id = id
11 self.name = name
12 self.passwd = passwd
13 self.token = token
14 def get_id(self):
15 return self.id
16 def set_password(self, passwd):
17 self.passwd = generate_password_hash(passwd)
18 with sqlite3.connect(cf['global']['database']) as conn:
19 c = conn.cursor()
20 c.execute("UPDATE users SET password = ? where id = ?", (self.passwd, self.id,))
21 def check_password(self, passwd):
22 return check_password_hash(self.passwd, passwd)
23 @classmethod
24 def from_id(self, id):
25 with sqlite3.connect(cf['global']['database']) as conn:
26 c = conn.cursor()
27 c.execute("SELECT name,password,token FROM users WHERE id = ?", (id,))
28 try:
29 name, passwd, token = c.fetchone()
30 except: return None # todo: ugly
31 return User(id, name, passwd, token)
32 @classmethod
33 def from_name(self, name):
34 with sqlite3.connect(cf['global']['database']) as conn:
35 c = conn.cursor()
36 c.execute("SELECT id,password,token FROM users WHERE name=?", (name,))
37 try:
38 id, passwd, token = c.fetchone()
39 except: return None # todo: ugly
40 return User(id, name, passwd, token)
41 @classmethod
42 def from_token(self, login_token):
43 # Note: this function reads the revocable token, not the internal one!
44 with sqlite3.connect(cf['global']['database']) as conn:
45 c = conn.cursor()
46 c.execute("""
47 SELECT id, name, password, users.token
48 FROM users JOIN user_tokens ON users.id = user_tokens.user_id
49 WHERE user_tokens.token = ?
50 """, (login_token,))
51 try:
52 id, name, passwd, token = c.fetchone()
53 return User(id, name, passwd, token)
54 except:
55 return None
56
57
58 def init_login(app):
59 login = LoginManager()
60 login.login_view = 'usermgmt.login_form'
61 login.init_app(app)
62
63 @login.user_loader
64 def load_user(id):
65 # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
66 return User.from_id(id)
67
68 @login.request_loader
69 def querytoken_auth(request):
70 if request.args.get('token'):
71 user = User.from_token(request.args.get('token'))
72 if user:
73 login_user(user)
74 return user
75 return None
76
77 usermgmt = Blueprint('usermgmt', __name__,
78 template_folder='templates',
79 static_folder='static',
80 static_url_path='/static/usermgmt')
81
82 @usermgmt.route('/login')
83 def login_form():
84 return render_template('login.html.j2')
85
86 @usermgmt.route('/login', methods=['POST'])
87 def do_login():
88 action = request.form.get('action')
89 if action == 'login':
90 user = User.from_name(request.form.get('user'))
91 if user and user.check_password(request.form.get('password')):
92 login_user(user, remember=request.form.get('remember'))
93 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
94 flash('wrong username and/or password', 'error')
95 elif action == 'register':
96 flash("open registration currently closed. ask <i>girst</i> on irc://irc.libera.chat/#invidious if you want an account.", 'info')
97 elif action == 'logout':
98 logout_user()
99 return redirect(request.args.get('next','/')) # xxx: non-exploitable open redirect!
100 else:
101 flash('unsupported action', 'error')
102 return redirect(url_for('usermgmt.login_form'))
103
104 @usermgmt.route('/manage/account')
105 @login_required
106 def account_manager():
107 with sqlite3.connect(cf['global']['database']) as conn:
108 c = conn.cursor()
109 c.execute("""
110 SELECT token
111 FROM user_tokens
112 WHERE user_id = ?
113 """, (current_user.id,))
114 result = c.fetchone()
115 if result:
116 (login_token,) = result
117 else:
118 login_token = ""
119 return render_template('account_mgmt.html.j2', login_token=login_token)
120
121 @usermgmt.route('/manage/account', methods=['POST'])
122 @login_required
123 def manage_account():
124 token = current_user.token
125 action = request.form.get('action')
126 if action == 'chpwd':
127 if not current_user.check_password(request.form.get('oldpasswd')):
128 flash('current password incorrect.', 'error')
129 else:
130 current_user.set_password(request.form.get('newpasswd'))
131 flash('password updated.', 'info')
132 if action == 'chtok':
133 with sqlite3.connect(cf['global']['database']) as conn:
134 new_token = secrets.token_urlsafe(16)
135 c = conn.cursor()
136 c.execute("""
137 INSERT OR REPLACE INTO user_tokens (user_id, token)
138 VALUES (?, ?)
139 """, (current_user.id, new_token))
140 flash('new token generated.', 'info')
141 else:
142 flash('unsupported action', 'error')
143
144 return redirect(url_for('usermgmt.account_manager'))
145
146 # NOTE: only register blueprint _after_ adding routes!
147 app.register_blueprint(usermgmt)
Imprint / Impressum