1
0
forked from ScoDoc/ScoDoc
ScoDoc/app/auth/models.py

548 lines
19 KiB
Python
Raw Normal View History

2021-05-29 18:22:51 +02:00
# -*- coding: UTF-8 -*
"""Users and Roles models for ScoDoc
"""
import base64
from datetime import datetime, timedelta
import os
2021-07-03 16:19:42 +02:00
import re
2021-05-29 18:22:51 +02:00
from time import time
2021-08-11 00:36:07 +02:00
from typing import Optional
2021-05-29 18:22:51 +02:00
import cracklib # pylint: disable=import-error
2022-01-03 12:33:27 +01:00
from flask import current_app, g
2021-05-29 18:22:51 +02:00
from flask_login import UserMixin, AnonymousUserMixin
2021-07-04 12:32:13 +02:00
2021-05-29 18:22:51 +02:00
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
2022-05-03 13:35:17 +02:00
from app import db, log, login
from app.models import Departement
from app.models import SHORT_STR_LEN
2021-07-03 16:19:42 +02:00
from app.scodoc.sco_exceptions import ScoValueError
2021-05-29 18:22:51 +02:00
from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_roles_default import SCO_ROLES_DEFAULTS
2021-06-28 10:45:00 +02:00
import app.scodoc.sco_utils as scu
from app.scodoc import sco_etud # a deplacer dans scu
2021-05-29 18:22:51 +02:00
2021-09-25 22:42:44 +02:00
VALID_LOGIN_EXP = re.compile(r"^[a-zA-Z0-9@\\\-_\.]+$")
2021-05-29 18:22:51 +02:00
def is_valid_password(cleartxt):
"""Check password.
returns True if OK.
"""
if (
hasattr(scu.CONFIG, "MIN_PASSWORD_LENGTH")
and scu.CONFIG.MIN_PASSWORD_LENGTH > 0
and len(cleartxt) < scu.CONFIG.MIN_PASSWORD_LENGTH
):
return False # invalid: too short
try:
_ = cracklib.FascistCheck(cleartxt)
return True
except ValueError:
return False
2021-05-29 18:22:51 +02:00
class User(UserMixin, db.Model):
"""ScoDoc users, handled by Flask / SQLAlchemy"""
id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120))
nom = db.Column(db.String(64))
prenom = db.Column(db.String(64))
dept = db.Column(db.String(SHORT_STR_LEN), index=True)
2021-06-27 12:11:39 +02:00
active = db.Column(db.Boolean, default=True, index=True)
2021-05-29 18:22:51 +02:00
password_hash = db.Column(db.String(128))
2021-07-05 00:07:17 +02:00
password_scodoc7 = db.Column(db.String(42))
2021-05-29 18:22:51 +02:00
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
date_modif_passwd = db.Column(db.DateTime, default=datetime.utcnow)
date_created = db.Column(db.DateTime, default=datetime.utcnow)
date_expiration = db.Column(db.DateTime, default=None)
passwd_temp = db.Column(db.Boolean, default=False)
2021-12-20 22:50:14 +01:00
token = db.Column(db.Text(), index=True, unique=True)
2021-05-29 18:22:51 +02:00
token_expiration = db.Column(db.DateTime)
2021-05-29 18:22:51 +02:00
roles = db.relationship("Role", secondary="user_role", viewonly=True)
Permission = Permission
_departement = db.relationship(
"Departement",
foreign_keys=[Departement.acronym],
primaryjoin=(dept == Departement.acronym),
2022-02-13 15:19:39 +01:00
lazy="select",
passive_deletes="all",
uselist=False,
)
2021-05-29 18:22:51 +02:00
def __init__(self, **kwargs):
self.roles = []
2021-06-27 12:11:39 +02:00
self.user_roles = []
# check login:
if kwargs.get("user_name") and not VALID_LOGIN_EXP.match(kwargs["user_name"]):
raise ValueError(f"invalid user_name: {kwargs['user_name']}")
2021-05-29 18:22:51 +02:00
super(User, self).__init__(**kwargs)
2021-07-04 12:32:13 +02:00
# Ajoute roles:
2021-05-29 18:22:51 +02:00
if (
not self.roles
and self.email
and self.email == current_app.config["SCODOC_ADMIN_MAIL"]
):
# super-admin
2021-07-03 16:19:42 +02:00
admin_role = Role.query.filter_by(name="SuperAdmin").first()
2021-05-29 18:22:51 +02:00
assert admin_role
self.add_role(admin_role, None)
db.session.commit()
2021-07-27 16:07:03 +02:00
# current_app.logger.info("creating user with roles={}".format(self.roles))
2021-05-29 18:22:51 +02:00
def __repr__(self):
2021-07-27 16:07:03 +02:00
return f"<User {self.user_name} id={self.id} dept={self.dept}{' (inactive)' if not self.active else ''}>"
2021-05-29 18:22:51 +02:00
def __str__(self):
return self.user_name
2021-05-29 18:22:51 +02:00
def set_password(self, password):
"Set password"
2021-07-27 16:07:03 +02:00
current_app.logger.info(f"set_password({self})")
2021-05-29 18:22:51 +02:00
if password:
self.password_hash = generate_password_hash(password)
else:
self.password_hash = None
self.passwd_temp = False
2021-05-29 18:22:51 +02:00
def check_password(self, password):
"""Check given password vs current one.
Returns `True` if the password matched, `False` otherwise.
"""
2021-06-27 12:11:39 +02:00
if not self.active: # inactived users can't login
return False
2021-07-05 00:07:17 +02:00
if (not self.password_hash) and self.password_scodoc7:
# Special case: user freshly migrated from ScoDoc7
if scu.check_scodoc7_password(self.password_scodoc7, password):
current_app.logger.warning(
2021-07-27 16:07:03 +02:00
f"migrating legacy ScoDoc7 password for {self}"
2021-07-05 00:07:17 +02:00
)
self.set_password(password)
self.password_scodoc7 = None
db.session.add(self)
db.session.commit()
return True
return False
2021-05-29 18:22:51 +02:00
if not self.password_hash: # user without password can't login
return False
return check_password_hash(self.password_hash, password)
def get_reset_password_token(self, expires_in=600):
2022-01-03 12:33:27 +01:00
"Un token pour réinitialiser son mot de passe"
2021-05-29 18:22:51 +02:00
return jwt.encode(
{"reset_password": self.id, "exp": time() + expires_in},
current_app.config["SECRET_KEY"],
algorithm="HS256",
)
2021-05-29 18:22:51 +02:00
@staticmethod
def verify_reset_password_token(token):
2022-01-03 12:33:27 +01:00
"Vérification du token de reéinitialisation du mot de passe"
2021-05-29 18:22:51 +02:00
try:
2022-05-03 13:35:17 +02:00
token = jwt.decode(
2021-05-29 18:22:51 +02:00
token, current_app.config["SECRET_KEY"], algorithms=["HS256"]
2022-05-03 13:35:17 +02:00
)
except jwt.exceptions.ExpiredSignatureError:
log(f"verify_reset_password_token: token expired")
2021-05-29 18:22:51 +02:00
except:
2022-05-03 13:35:17 +02:00
return None
try:
user_id = token["reset_password"]
# double check en principe inutile car déjà fait dans decode()
expire = float(token["exp"])
if time() > expire:
log(f"verify_reset_password_token: token expired for uid={user_id}")
return None
except (TypeError, KeyError):
return None
2022-01-03 12:33:27 +01:00
return User.query.get(user_id)
2021-05-29 18:22:51 +02:00
2021-06-27 12:11:39 +02:00
def to_dict(self, include_email=True):
2022-01-03 12:33:27 +01:00
"""l'utilisateur comme un dict, avec des champs supplémentaires"""
2021-05-29 18:22:51 +02:00
data = {
"date_expiration": self.date_expiration.isoformat() + "Z"
if self.date_expiration
else None,
"date_modif_passwd": self.date_modif_passwd.isoformat() + "Z"
if self.date_modif_passwd
else None,
"date_created": self.date_created.isoformat() + "Z"
if self.date_created
else None,
"dept": self.dept,
2021-05-29 18:22:51 +02:00
"id": self.id,
2021-06-27 12:11:39 +02:00
"active": self.active,
"status_txt": "actif" if self.active else "fermé",
"last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
2021-07-27 16:07:03 +02:00
"nom": (self.nom or ""), # sco8
"prenom": (self.prenom or ""), # sco8
2021-07-03 16:19:42 +02:00
"roles_string": self.get_roles_string(), # eg "Ens_RT, Ens_Info"
2021-07-27 16:07:03 +02:00
"user_name": self.user_name, # sco8
2021-07-04 12:32:13 +02:00
# Les champs calculés:
"nom_fmt": self.get_nom_fmt(),
"prenom_fmt": self.get_prenom_fmt(),
"nomprenom": self.get_nomprenom(),
"prenomnom": self.get_prenomnom(),
"nomplogin": self.get_nomplogin(),
"nomcomplet": self.get_nomcomplet(),
2021-05-29 18:22:51 +02:00
}
if include_email:
2021-06-27 12:11:39 +02:00
data["email"] = self.email or ""
2021-05-29 18:22:51 +02:00
return data
def from_dict(self, data, new_user=False):
2021-07-03 16:19:42 +02:00
"""Set users' attributes from given dict values.
2021-07-27 16:07:03 +02:00
Roles must be encoded as "roles_string", like "Ens_RT, Secr_CJ"
2021-07-03 16:19:42 +02:00
"""
2021-07-27 16:55:50 +02:00
for field in ["nom", "prenom", "dept", "active", "email", "date_expiration"]:
2021-05-29 18:22:51 +02:00
if field in data:
2021-08-09 14:29:03 +02:00
setattr(self, field, data[field] or None)
2021-07-03 16:19:42 +02:00
if new_user:
if "user_name" in data:
# never change name of existing users
self.user_name = data["user_name"]
if "password" in data:
self.set_password(data["password"])
if not VALID_LOGIN_EXP.match(self.user_name):
raise ValueError(f"invalid user_name: {self.user_name}")
2021-07-03 16:19:42 +02:00
# Roles: roles_string is "Ens_RT, Secr_RT, ..."
if "roles_string" in data:
self.user_roles = []
for r_d in data["roles_string"].split(","):
2021-09-29 10:27:49 +02:00
if r_d:
role, dept = UserRole.role_dept_from_string(r_d)
self.add_role(role, dept)
2021-05-29 18:22:51 +02:00
def get_token(self, expires_in=3600):
2022-05-03 13:35:17 +02:00
"Un jeton pour cet user. Stocké en base, non commité."
2021-05-29 18:22:51 +02:00
now = datetime.utcnow()
if self.token and self.token_expiration > now + timedelta(seconds=60):
return self.token
self.token = base64.b64encode(os.urandom(24)).decode("utf-8")
self.token_expiration = now + timedelta(seconds=expires_in)
db.session.add(self)
return self.token
def revoke_token(self):
2022-05-03 13:35:17 +02:00
"Révoque le jeton de cet utilisateur"
2021-05-29 18:22:51 +02:00
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
@staticmethod
def check_token(token):
"""Retreive user for given token, chek token's validity
and returns the user object.
"""
2021-05-29 18:22:51 +02:00
user = User.query.filter_by(token=token).first()
if user is None or user.token_expiration < datetime.utcnow():
return None
return user
def get_dept_id(self) -> int:
"returns user's department id, or None"
if self.dept:
2022-02-13 15:19:39 +01:00
return self._departement.id
return None
2021-05-29 18:22:51 +02:00
# Permissions management:
def has_permission(self, perm: int, dept=False):
2021-05-29 18:22:51 +02:00
"""Check if user has permission `perm` in given `dept`.
2021-06-27 12:11:39 +02:00
Similar to Zope ScoDoc7 `has_permission``
2021-05-29 18:22:51 +02:00
Args:
perm: integer, one of the value defined in Permission class.
2021-07-03 16:19:42 +02:00
dept: dept id (eg 'RT'), default to current departement.
2021-05-29 18:22:51 +02:00
"""
2021-06-27 12:11:39 +02:00
if not self.active:
return False
2021-06-15 15:38:38 +02:00
if dept is False:
dept = g.scodoc_dept
2021-05-29 18:22:51 +02:00
# les role liés à ce département, et les roles avec dept=None (super-admin)
roles_in_dept = (
UserRole.query.filter_by(user_id=self.id)
.filter((UserRole.dept == dept) | (UserRole.dept == None))
.all()
)
for user_role in roles_in_dept:
if user_role.role.has_permission(perm):
return True
return False
# Role management
2022-07-24 07:14:31 +02:00
def add_role(self, role: "Role", dept: str):
2021-05-29 18:22:51 +02:00
"""Add a role to this user.
:param role: Role to add.
"""
2022-03-14 10:55:39 +01:00
if not isinstance(role, Role):
raise ScoValueError("add_role: rôle invalide")
2021-05-29 18:22:51 +02:00
self.user_roles.append(UserRole(user=self, role=role, dept=dept))
2022-07-24 07:14:31 +02:00
def add_roles(self, roles: "list[Role]", dept: str):
2021-05-29 18:22:51 +02:00
"""Add roles to this user.
:param roles: Roles to add.
"""
for role in roles:
self.add_role(role, dept)
def set_roles(self, roles, dept):
2021-07-03 16:19:42 +02:00
"set roles in the given dept"
2022-03-14 10:55:39 +01:00
self.user_roles = [
UserRole(user=self, role=r, dept=dept) for r in roles if isinstance(r, Role)
]
2021-05-29 18:22:51 +02:00
def get_roles(self):
2021-07-03 16:19:42 +02:00
"iterator on my roles"
2021-05-29 18:22:51 +02:00
for role in self.roles:
yield role
def get_roles_string(self):
"""string repr. of user's roles (with depts)
2021-07-03 16:19:42 +02:00
e.g. "Ens_RT, Ens_Info, Secr_CJ"
"""
2022-03-14 10:55:39 +01:00
return ",".join(
f"{r.role.name or ''}_{r.dept or ''}"
for r in self.user_roles
if r is not None
)
def get_depts_with_permission(self, permission: int) -> list[str]:
"""Liste des acronymes de département dans lesquels cet utilisateur
possède la permission indiquée.
L'"acronyme" None signifie "tous les départements".
Si plusieurs permissions (plusieurs bits) sont indiquées, c'est un "ou":
les départements dans lesquels l'utilisateur a l'une des permissions.
"""
return [
user_role.dept
for user_role in UserRole.query.filter_by(user=self)
.join(Role)
.filter(Role.permissions.op("&")(permission) != 0)
]
2021-05-29 18:22:51 +02:00
def is_administrator(self):
2021-07-03 16:19:42 +02:00
"True if i'm an active SuperAdmin"
return self.active and self.has_permission(Permission.ScoSuperAdmin, dept=None)
2021-05-29 18:22:51 +02:00
2021-06-28 10:45:00 +02:00
# Some useful strings:
def get_nomplogin(self):
"""nomplogin est le nom en majuscules suivi du prénom et du login
e.g. Dupont Pierre (dupont)
"""
if self.nom:
n = sco_etud.format_nom(self.nom)
else:
n = self.user_name.upper()
2021-06-28 10:45:00 +02:00
return "%s %s (%s)" % (
n,
sco_etud.format_prenom(self.prenom),
self.user_name,
)
@staticmethod
2021-08-11 00:36:07 +02:00
def get_user_id_from_nomplogin(nomplogin: str) -> Optional[int]:
"""Returns id from the string "Dupont Pierre (dupont)"
or None if user does not exist
"""
m = re.match(r".*\((.*)\)", nomplogin.strip())
if m:
2021-08-11 00:36:07 +02:00
user_name = m.group(1)
u = User.query.filter_by(user_name=user_name).first()
if u:
return u.id
return None
2021-07-04 12:32:13 +02:00
def get_nom_fmt(self):
2022-05-10 20:32:25 +02:00
"""Nom formaté: "Martin" """
2021-07-04 12:32:13 +02:00
if self.nom:
return sco_etud.format_nom(self.nom, uppercase=False)
else:
return self.user_name
def get_prenom_fmt(self):
"""Prénom formaté (minuscule capitalisées)"""
return sco_etud.format_prenom(self.prenom)
def get_nomprenom(self):
"""Nom capitalisé suivi de l'initiale du prénom:
Viennet E.
"""
prenom_abbrv = scu.abbrev_prenom(sco_etud.format_prenom(self.prenom))
return (self.get_nom_fmt() + " " + prenom_abbrv).strip()
def get_prenomnom(self):
"""L'initiale du prénom suivie du nom: "J.-C. Dupont" """
prenom_abbrv = scu.abbrev_prenom(sco_etud.format_prenom(self.prenom))
return (prenom_abbrv + " " + self.get_nom_fmt()).strip()
def get_nomcomplet(self):
"Prénom et nom complets"
return sco_etud.format_prenom(self.prenom) + " " + self.get_nom_fmt()
# nomnoacc était le nom en minuscules sans accents (inutile)
2021-05-29 18:22:51 +02:00
class AnonymousUser(AnonymousUserMixin):
def has_permission(self, perm, dept=None):
return False
def is_administrator(self):
return False
login.anonymous_user = AnonymousUser
class Role(db.Model):
"""Roles for ScoDoc"""
id = db.Column(db.Integer, primary_key=True)
2022-08-07 11:08:12 +02:00
name = db.Column(db.String(64), unique=True, nullable=False, index=True)
2021-05-29 18:22:51 +02:00
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.BigInteger) # 64 bits
users = db.relationship("User", secondary="user_role", viewonly=True)
def __init__(self, **kwargs):
super(Role, self).__init__(**kwargs)
if self.permissions is None:
self.permissions = 0
def __repr__(self):
return "<Role {} perm={:0{w}b}>".format(
self.name,
self.permissions & ((1 << Permission.NBITS) - 1),
w=Permission.NBITS,
)
2022-07-24 07:14:31 +02:00
def __str__(self):
return f"{self.name}: perm={', '.join(Permission.permissions_names(self.permissions))}"
2022-08-06 22:31:41 +02:00
def to_dict(self) -> dict:
"As dict. Convert permissions to names."
return {
"id": self.id,
2022-08-07 11:08:12 +02:00
"role_name": self.name, # pour être cohérent avec partion_name, etc.
2022-08-06 22:31:41 +02:00
"permissions": Permission.permissions_names(self.permissions),
}
2022-08-07 11:08:12 +02:00
def add_permission(self, perm: int):
"Add permission to role"
2021-05-29 18:22:51 +02:00
self.permissions |= perm
2022-08-07 11:08:12 +02:00
def remove_permission(self, perm: int):
"Remove permission from role"
2021-05-29 18:22:51 +02:00
self.permissions = self.permissions & ~perm
def reset_permissions(self):
2022-08-07 11:08:12 +02:00
"Remove all permissions from role"
2021-05-29 18:22:51 +02:00
self.permissions = 0
2022-08-07 11:08:12 +02:00
def set_named_permissions(self, permission_names: list[str]):
"""Set permissions, given as a list of permissions names.
Raises ScoValueError if invalid permission."""
self.permissions = 0
for permission_name in permission_names:
permission = Permission.get_by_name(permission_name)
if permission is None:
raise ScoValueError("set_named_permissions: invalid permission name")
self.permissions |= permission
def has_permission(self, perm: int) -> bool:
"True if role as this permission"
2021-05-29 18:22:51 +02:00
return self.permissions & perm == perm
@staticmethod
def reset_standard_roles_permissions(reset_permissions=True):
"""Create default roles if missing, then, if reset_permissions,
reset their permissions to default values.
"""
2021-05-29 18:22:51 +02:00
default_role = "Observateur"
2021-07-05 00:07:17 +02:00
for role_name, permissions in SCO_ROLES_DEFAULTS.items():
role = Role.query.filter_by(name=role_name).first()
2021-05-29 18:22:51 +02:00
if role is None:
2021-07-05 00:07:17 +02:00
role = Role(name=role_name)
role.default = role.name == default_role
db.session.add(role)
if reset_permissions:
role.reset_permissions()
for perm in permissions:
role.add_permission(perm)
db.session.add(role)
2021-05-29 18:22:51 +02:00
db.session.commit()
@staticmethod
def ensure_standard_roles():
"""Create default roles if missing"""
Role.reset_standard_roles_permissions(reset_permissions=False)
2021-05-29 18:22:51 +02:00
@staticmethod
def get_named_role(name):
"""Returns existing role with given name, or None."""
return Role.query.filter_by(name=name).first()
class UserRole(db.Model):
"""Associate user to role, in a dept.
If dept is None, the role applies to all departments (eg super admin).
"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
role_id = db.Column(db.Integer, db.ForeignKey("role.id"))
2021-09-28 16:20:15 +02:00
dept = db.Column(db.String(64)) # dept acronym ou NULL
2021-05-29 18:22:51 +02:00
user = db.relationship(
User, backref=db.backref("user_roles", cascade="all, delete-orphan")
)
role = db.relationship(
Role, backref=db.backref("user_roles", cascade="all, delete-orphan")
)
def __repr__(self):
return "<UserRole u={} r={} dept={}>".format(self.user, self.role, self.dept)
2021-07-03 16:19:42 +02:00
@staticmethod
2021-08-30 16:55:07 +02:00
def role_dept_from_string(role_dept: str):
2021-07-03 16:19:42 +02:00
"""Return tuple (role, dept) from the string
role_dept, of the forme "Role_Dept".
2021-08-30 16:55:07 +02:00
role is a Role instance, dept is a string, or None.
2021-07-03 16:19:42 +02:00
"""
fields = role_dept.split("_", 1) # maxsplit=1, le dept peut contenir un "_"
if len(fields) != 2:
2021-09-29 10:27:49 +02:00
current_app.logger.warning(
f"role_dept_from_string: Invalid role_dept '{role_dept}'"
)
2021-07-03 16:19:42 +02:00
raise ScoValueError("Invalid role_dept")
role_name, dept = fields
2021-08-30 16:55:07 +02:00
if dept == "":
dept = None
2021-07-03 16:19:42 +02:00
role = Role.query.filter_by(name=role_name).first()
if role is None:
raise ScoValueError("role %s does not exists" % role_name)
return (role, dept)
2021-05-29 18:22:51 +02:00
def get_super_admin():
2021-09-29 14:15:12 +02:00
"""L'utilisateur admin (ou le premier, s'il y en a plusieurs).
Utilisé par les tests unitaires et le script de migration.
"""
admin_role = Role.query.filter_by(name="SuperAdmin").first()
assert admin_role
admin_user = (
User.query.join(UserRole)
.filter((UserRole.user_id == User.id) & (UserRole.role_id == admin_role.id))
.first()
)
assert admin_user
return admin_user