]>
git.gir.st - subscriptionfeed.git/blob - app/common/user.py
1 from werkzeug
.security
import generate_password_hash
, check_password_hash
5 from flask_login
import LoginManager
, UserMixin
, login_user
, logout_user
, login_required
, current_user
6 from flask
import Blueprint
, flash
, redirect
, render_template
, url_for
, request
8 class User(UserMixin
): # TODO: to common
9 def __init__(self
, id, name
, passwd
, token
, is_admin
):
17 def set_password(self
, passwd
):
18 self
.passwd
= generate_password_hash(passwd
)
19 with sqlite3
.connect(cf
['global']['database']) as conn
:
21 c
.execute("UPDATE users SET password = ? where id = ?", (self
.passwd
, self
.id,))
22 def check_password(self
, passwd
):
23 return check_password_hash(self
.passwd
, passwd
)
25 def from_id(self
, id):
26 with sqlite3
.connect(cf
['global']['database']) as conn
:
28 c
.execute("SELECT name,password,token,is_admin FROM users WHERE id = ?", (id,))
30 name
, passwd
, token
, admin
= c
.fetchone()
31 except: return None # todo: ugly
32 return User(id, name
, passwd
, token
, admin
)
34 def from_name(self
, name
):
35 with sqlite3
.connect(cf
['global']['database']) as conn
:
37 c
.execute("SELECT id,password,token,is_admin FROM users WHERE name=?", (name
,))
39 id, passwd
, token
, admin
= c
.fetchone()
40 except: return None # todo: ugly
41 return User(id, name
, passwd
, token
, admin
)
43 def from_token(self
, login_token
):
44 # Note: this function reads the revocable token, not the internal one!
45 with sqlite3
.connect(cf
['global']['database']) as conn
:
48 SELECT id, name, password, users.token, is_admin
49 FROM users JOIN user_tokens ON users.id = user_tokens.user_id
50 WHERE user_tokens.token = ?
53 id, name
, passwd
, token
, admin
= c
.fetchone()
54 return User(id, name
, passwd
, token
, admin
)
60 login
= LoginManager()
61 login
.login_view
= 'usermgmt.login_form'
66 # in the future tokens will be invalidable by users. -> https://flask-login.readthedocs.io/en/latest/#alternative-tokens
67 return User
.from_id(id)
70 def querytoken_auth(request
):
71 if request
.args
.get('token'):
72 user
= User
.from_token(request
.args
.get('token'))
78 usermgmt
= Blueprint('usermgmt', __name__
,
79 template_folder
='templates',
80 static_folder
='static',
81 static_url_path
='/static/usermgmt')
83 @usermgmt.route('/login')
85 return render_template('login.html.j2')
87 @usermgmt.route('/login', methods
=['POST'])
89 action
= request
.form
.get('action')
91 user
= User
.from_name(request
.form
.get('user'))
92 if user
and user
.check_password(request
.form
.get('password')):
93 login_user(user
, remember
=request
.form
.get('remember'))
94 return redirect(request
.args
.get('next','/')) # xxx: non-exploitable open redirect!
95 flash('wrong username and/or password', 'error')
96 elif action
== 'register':
97 flash("open registration currently closed. ask <i>girst</i> on irc://irc.libera.chat/#invidious if you want an account.", 'info')
98 elif action
== 'logout':
100 return redirect(request
.args
.get('next','/')) # xxx: non-exploitable open redirect!
102 flash('unsupported action', 'error')
103 return redirect(url_for('usermgmt.login_form'))
105 @usermgmt.route('/manage/account')
107 def account_manager():
108 with sqlite3
.connect(cf
['global']['database']) as conn
:
114 """, (current_user
.id,))
115 result
= c
.fetchone()
117 (login_token
,) = result
120 return render_template('account_mgmt.html.j2', login_token
=login_token
, random_pwd
=secrets
.token_hex(16))
122 @usermgmt.route('/manage/account', methods
=['POST'])
124 def manage_account():
125 token
= current_user
.token
126 action
= request
.form
.get('action')
127 if action
== 'chpwd':
128 if not current_user
.check_password(request
.form
.get('oldpasswd')):
129 flash('current password incorrect.', 'error')
131 current_user
.set_password(request
.form
.get('newpasswd'))
132 flash('password updated.', 'info')
133 elif action
== 'chtok':
134 with sqlite3
.connect(cf
['global']['database']) as conn
:
135 new_token
= secrets
.token_urlsafe(16)
138 INSERT OR REPLACE INTO user_tokens (user_id, token)
140 """, (current_user
.id, new_token
))
141 flash('new token generated.', 'info')
142 elif action
== 'addusr':
143 if not current_user
.admin
:
144 return "only admins may do that!", 403
145 with sqlite3
.connect(cf
['global']['database']) as conn
:
146 new_token
= secrets
.token_urlsafe(16)
147 username
= request
.form
.get('user')
148 password
= request
.form
.get('pass')
149 password
= generate_password_hash(password
)
150 is_admin
= request
.form
.get('admin') == 'yes'
154 INSERT INTO users (name, password, is_admin, token)
156 """, (username
, password
, is_admin
, new_token
));
157 flash('new user created.', 'info')
158 except sqlite3
.DatabaseError
as e
:
159 flash('error creating user: {e}', 'error')
161 flash('unsupported action', 'error')
163 return redirect(url_for('usermgmt.account_manager'))
165 # NOTE: only register blueprint _after_ adding routes!
166 app
.register_blueprint(usermgmt
)