forked from ScoDoc/ScoDoc
821 lines
30 KiB
Python
821 lines
30 KiB
Python
# -*- coding: UTF-8 -*
|
|
|
|
"""Users and Roles models for ScoDoc
|
|
"""
|
|
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import re
|
|
from time import time
|
|
from typing import Optional
|
|
|
|
import cracklib # pylint: disable=import-error
|
|
|
|
from flask import current_app, g
|
|
from flask_login import UserMixin, AnonymousUserMixin
|
|
from sqlalchemy.exc import (
|
|
IntegrityError,
|
|
DataError,
|
|
DatabaseError,
|
|
OperationalError,
|
|
ProgrammingError,
|
|
StatementError,
|
|
InterfaceError,
|
|
)
|
|
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
import jwt
|
|
|
|
from app import db, email, log, login
|
|
from app.models import Departement, ScoDocModel
|
|
from app.models import SHORT_STR_LEN, USERNAME_STR_LEN
|
|
from app.models.config import ScoDocSiteConfig
|
|
from app.scodoc.sco_exceptions import ScoValueError
|
|
from app.scodoc.sco_permissions import Permission
|
|
from app.scodoc.sco_roles_default import SCO_ROLES_DEFAULTS
|
|
import app.scodoc.sco_utils as scu
|
|
|
|
VALID_LOGIN_EXP = re.compile(r"^[a-zA-Z0-9@\\\-_\.]+$")
|
|
|
|
|
|
def is_valid_password(cleartxt) -> bool:
|
|
"""Check password.
|
|
returns True if OK.
|
|
"""
|
|
if (
|
|
hasattr(scu.CONFIG, "MIN_PASSWORD_LENGTH")
|
|
and scu.CONFIG.MIN_PASSWORD_LENGTH > 0
|
|
and len(cleartxt) < scu.CONFIG.MIN_PASSWORD_LENGTH
|
|
):
|
|
return False # invalid: too short
|
|
try:
|
|
_ = cracklib.FascistCheck(cleartxt)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def is_valid_user_name(user_name: str) -> bool:
|
|
"Check that user_name (aka login) is valid"
|
|
return (
|
|
user_name
|
|
and (len(user_name) >= 2)
|
|
and (len(user_name) < USERNAME_STR_LEN)
|
|
and VALID_LOGIN_EXP.match(user_name)
|
|
)
|
|
|
|
|
|
class User(UserMixin, ScoDocModel):
|
|
"""ScoDoc users, handled by Flask / SQLAlchemy"""
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_name = db.Column(db.String(USERNAME_STR_LEN), index=True, unique=True)
|
|
"le login"
|
|
email = db.Column(db.String(120))
|
|
"email à utiliser par ScoDoc"
|
|
email_institutionnel = db.Column(db.String(120))
|
|
"email dans l'établissement, facultatif"
|
|
nom = db.Column(db.String(USERNAME_STR_LEN))
|
|
prenom = db.Column(db.String(USERNAME_STR_LEN))
|
|
dept = db.Column(db.String(SHORT_STR_LEN), index=True)
|
|
"acronyme du département de l'utilisateur"
|
|
active = db.Column(db.Boolean, default=True, index=True)
|
|
"si faux, compte utilisateur désactivé"
|
|
cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True)
|
|
"uid sur le CAS (id, mail ou autre attribut, selon config.cas_attribute_id)"
|
|
cas_allow_login = db.Column(
|
|
db.Boolean, default=False, server_default="false", nullable=False
|
|
)
|
|
"Peut-on se logguer via le CAS ?"
|
|
cas_allow_scodoc_login = db.Column(
|
|
db.Boolean, default=False, server_default="false", nullable=False
|
|
)
|
|
"""Si CAS activé et cas_id renseigné, peut-on se logguer sur ScoDoc directement ?
|
|
(le rôle ScoSuperAdmin peut toujours, mettre à True pour les utilisateur API)
|
|
"""
|
|
cas_last_login = db.Column(db.DateTime, nullable=True)
|
|
"""date du dernier login via CAS"""
|
|
edt_id = db.Column(db.Text(), index=True, nullable=True)
|
|
"identifiant emplois du temps (unicité non imposée)"
|
|
password_hash = db.Column(db.Text()) # les hashs modernes peuvent être très longs
|
|
password_scodoc7 = db.Column(db.String(42))
|
|
last_seen = db.Column(db.DateTime, default=datetime.now)
|
|
date_modif_passwd = db.Column(db.DateTime, default=datetime.now)
|
|
date_created = db.Column(db.DateTime, default=datetime.now)
|
|
date_expiration = db.Column(db.DateTime, default=None)
|
|
passwd_must_be_changed = db.Column(
|
|
db.Boolean, nullable=False, server_default="false", default=False
|
|
)
|
|
passwd_temp = db.Column(db.Boolean, default=False)
|
|
"""champ obsolete. Si connexion alors que passwd_temp est vrai,
|
|
efface mot de passe et redirige vers accueil."""
|
|
token = db.Column(db.Text(), index=True, unique=True)
|
|
token_expiration = db.Column(db.DateTime)
|
|
|
|
# Define the back reference from User to ModuleImpl
|
|
modimpls = db.relationship("ModuleImpl", back_populates="responsable")
|
|
roles = db.relationship("Role", secondary="user_role", viewonly=True)
|
|
Permission = Permission
|
|
|
|
_departement = db.relationship(
|
|
"Departement",
|
|
foreign_keys=[Departement.acronym],
|
|
primaryjoin=(dept == Departement.acronym),
|
|
lazy="select",
|
|
passive_deletes="all",
|
|
uselist=False,
|
|
)
|
|
|
|
def __init__(self, **kwargs):
|
|
"user_name:str is mandatory"
|
|
self.roles = []
|
|
self.user_roles = []
|
|
# check login:
|
|
if "user_name" not in kwargs:
|
|
raise ValueError("missing user_name argument")
|
|
if not is_valid_user_name(kwargs["user_name"]):
|
|
raise ValueError(f"invalid user_name: {kwargs['user_name']}")
|
|
kwargs["nom"] = kwargs.get("nom", "") or ""
|
|
kwargs["prenom"] = kwargs.get("prenom", "") or ""
|
|
super().__init__(**kwargs)
|
|
# Ajoute roles:
|
|
if (
|
|
not self.roles
|
|
and self.email
|
|
and self.email == current_app.config["SCODOC_ADMIN_MAIL"]
|
|
):
|
|
# super-admin
|
|
admin_role = Role.query.filter_by(name="SuperAdmin").first()
|
|
assert admin_role
|
|
self.add_role(admin_role, None)
|
|
db.session.commit()
|
|
# current_app.logger.info("creating user with roles={}".format(self.roles))
|
|
|
|
def __repr__(self):
|
|
return f"""<User {self.user_name} id={self.id} dept={self.dept}{
|
|
' (inactive)' if not self.active else ''}>"""
|
|
|
|
def __str__(self):
|
|
return self.user_name
|
|
|
|
@classmethod
|
|
def get_user(cls, user_id: int | str, accept_none=False):
|
|
"""Get user by id, user_name or User instance, ou 404 (ou None si accept_none)
|
|
If user_id == -1, returns None (without exception)
|
|
"""
|
|
query = None
|
|
if isinstance(user_id, str):
|
|
query = db.session.query(cls).filter_by(user_name=user_id)
|
|
elif isinstance(user_id, int):
|
|
if user_id == -1:
|
|
return None
|
|
query = db.session.query(cls).filter_by(id=user_id)
|
|
elif isinstance(user_id, User):
|
|
return user_id
|
|
else:
|
|
raise ValueError("invalid user_id")
|
|
return query.first_or_404() if not accept_none else query.first()
|
|
|
|
def set_password(self, password):
|
|
"Set password"
|
|
current_app.logger.info(f"set_password({self})")
|
|
if password:
|
|
self.password_hash = generate_password_hash(password)
|
|
else:
|
|
self.password_hash = None
|
|
# La création d'un mot de passe efface l'éventuel mot de passe historique
|
|
self.password_scodoc7 = None
|
|
self.passwd_temp = False
|
|
# Retire le flag
|
|
self.passwd_must_be_changed = False
|
|
|
|
def check_password(self, password: str) -> bool:
|
|
"""Check given password vs current one.
|
|
Returns `True` if the password matched, `False` otherwise.
|
|
"""
|
|
if not self.active: # inactived users can't login
|
|
current_app.logger.warning(
|
|
f"auth: login attempt from inactive account {self}"
|
|
)
|
|
return False
|
|
if self.passwd_temp:
|
|
# Anciens comptes ScoDoc 7 non migrés
|
|
# désactive le compte par sécurité.
|
|
current_app.logger.warning(f"auth: desactivating legacy account {self}")
|
|
self.active = False
|
|
self.passwd_temp = True
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
send_notif_desactivation_user(self)
|
|
return False
|
|
|
|
# if CAS activated and cas_id, allow only super-user and users with cas_allow_scodoc_login
|
|
cas_enabled = ScoDocSiteConfig.is_cas_enabled()
|
|
if cas_enabled:
|
|
if ScoDocSiteConfig.get("cas_force") and not self.is_administrator():
|
|
return False # si CAS forcé, n'accepte que super-admin
|
|
if self.cas_id and not self.cas_allow_scodoc_login:
|
|
return False
|
|
|
|
if not self.password_hash: # user without password can't login
|
|
if self.password_scodoc7:
|
|
# Special case: user freshly migrated from ScoDoc7
|
|
return self._migrate_scodoc7_password(password)
|
|
return False
|
|
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def _migrate_scodoc7_password(self, password) -> bool:
|
|
"""After migration, rehash password."""
|
|
if scu.check_scodoc7_password(self.password_scodoc7, password):
|
|
current_app.logger.warning(
|
|
f"auth: migrating legacy ScoDoc7 password for {self}"
|
|
)
|
|
self.set_password(password)
|
|
self.password_scodoc7 = None
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
|
|
def get_reset_password_token(self, expires_in=600):
|
|
"Un token pour réinitialiser son mot de passe"
|
|
return jwt.encode(
|
|
{"reset_password": self.id, "exp": time() + expires_in},
|
|
current_app.config["SECRET_KEY"],
|
|
algorithm="HS256",
|
|
)
|
|
|
|
@staticmethod
|
|
def verify_reset_password_token(token):
|
|
"Vérification du token de ré-initialisation du mot de passe"
|
|
try:
|
|
token = jwt.decode(
|
|
token, current_app.config["SECRET_KEY"], algorithms=["HS256"]
|
|
)
|
|
except jwt.exceptions.ExpiredSignatureError:
|
|
log("verify_reset_password_token: token expired")
|
|
except: # pylint: disable=bare-except
|
|
return None
|
|
try:
|
|
user_id = token["reset_password"]
|
|
# double check en principe inutile car déjà fait dans decode()
|
|
expire = float(token["exp"])
|
|
if time() > expire:
|
|
log(f"verify_reset_password_token: token expired for uid={user_id}")
|
|
return None
|
|
except (TypeError, KeyError):
|
|
return None
|
|
return db.session.get(User, user_id)
|
|
|
|
def sort_key(self) -> tuple:
|
|
"sort key"
|
|
return (
|
|
(self.nom or "").upper(),
|
|
(self.prenom or "").upper(),
|
|
(self.user_name or "").upper(),
|
|
)
|
|
|
|
def to_dict(self, include_email=True):
|
|
"""l'utilisateur comme un dict, avec des champs supplémentaires"""
|
|
data = {
|
|
"date_expiration": (
|
|
self.date_expiration.isoformat() + "Z" if self.date_expiration else None
|
|
),
|
|
"date_modif_passwd": (
|
|
self.date_modif_passwd.isoformat() + "Z"
|
|
if self.date_modif_passwd
|
|
else None
|
|
),
|
|
"passwd_must_be_changed": self.passwd_must_be_changed,
|
|
"date_created": (
|
|
self.date_created.isoformat() + "Z" if self.date_created else None
|
|
),
|
|
"dept": self.dept,
|
|
"id": self.id,
|
|
"active": self.active,
|
|
"cas_id": self.cas_id,
|
|
"cas_allow_login": self.cas_allow_login,
|
|
"cas_allow_scodoc_login": self.cas_allow_scodoc_login,
|
|
"cas_last_login": (
|
|
self.cas_last_login.isoformat() + "Z" if self.cas_last_login else None
|
|
),
|
|
"edt_id": self.edt_id,
|
|
"status_txt": "actif" if self.active else "fermé",
|
|
"last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
|
|
"nom": self.nom or "",
|
|
"prenom": self.prenom or "",
|
|
"roles_string": self.get_roles_string(), # eg "Ens_RT, Ens_Info"
|
|
"user_name": self.user_name,
|
|
# Les champs calculés:
|
|
"nom_fmt": self.get_nom_fmt(),
|
|
"prenom_fmt": self.get_prenom_fmt(),
|
|
"nomprenom": self.get_nomprenom(),
|
|
"prenomnom": self.get_prenomnom(),
|
|
"nomplogin": self.get_nomplogin(),
|
|
"nomcomplet": self.get_nomcomplet(),
|
|
}
|
|
if include_email:
|
|
data["email"] = self.email or ""
|
|
data["email_institutionnel"] = self.email_institutionnel or ""
|
|
return data
|
|
|
|
@classmethod
|
|
def convert_dict_fields(cls, args: dict) -> dict:
|
|
"""Convert fields in the given dict. No other side effect.
|
|
args: dict with args in application.
|
|
returns: dict to store in model's db.
|
|
Convert boolean values to bools.
|
|
"""
|
|
args_dict = args
|
|
# Dates
|
|
if "date_expiration" in args:
|
|
date_expiration = args.get("date_expiration")
|
|
if isinstance(date_expiration, str):
|
|
args["date_expiration"] = (
|
|
datetime.datetime.fromisoformat(date_expiration)
|
|
if date_expiration
|
|
else None
|
|
)
|
|
# booléens:
|
|
for field in ("active", "cas_allow_login", "cas_allow_scodoc_login"):
|
|
if field in args:
|
|
args_dict[field] = scu.to_bool(args.get(field))
|
|
|
|
# chaines ne devant pas être NULLs
|
|
for field in ("nom", "prenom"):
|
|
if field in args:
|
|
args[field] = args[field] or ""
|
|
|
|
# chaines ne devant pas être vides mais au contraire null (unicité)
|
|
if "cas_id" in args:
|
|
args["cas_id"] = args["cas_id"] or None
|
|
|
|
return args_dict
|
|
|
|
def from_dict(self, args: dict, new_user=False):
|
|
"""Set users' attributes from given dict values.
|
|
- roles_string : roles, encoded like "Ens_RT, Secr_CJ"
|
|
- date_expiration is a dateime object.
|
|
Does not check permissions here.
|
|
"""
|
|
if new_user:
|
|
if "user_name" in args:
|
|
# never change name of existing users
|
|
# (see change_user_name method to do that)
|
|
if not is_valid_user_name(args["user_name"]):
|
|
raise ValueError(f"invalid user_name: {args['user_name']}")
|
|
self.user_name = args["user_name"]
|
|
if "password" in args:
|
|
self.set_password(args["password"])
|
|
|
|
# Roles: roles_string is "Ens_RT, Secr_RT, ..."
|
|
if "roles_string" in args:
|
|
self.user_roles = []
|
|
for r_d in args["roles_string"].split(","):
|
|
if r_d:
|
|
role, dept = UserRole.role_dept_from_string(r_d)
|
|
self.add_role(role, dept)
|
|
|
|
super().from_dict(args, excluded={"user_name", "roles_string", "roles"})
|
|
|
|
if ScoDocSiteConfig.cas_uid_use_scodoc():
|
|
self.cas_id = self.user_name
|
|
else:
|
|
# Set cas_id using regexp if configured:
|
|
exp = ScoDocSiteConfig.get("cas_uid_from_mail_regexp")
|
|
if exp and self.email_institutionnel:
|
|
cas_id = ScoDocSiteConfig.extract_cas_id(self.email_institutionnel)
|
|
if cas_id:
|
|
self.cas_id = cas_id
|
|
|
|
def get_token(self, expires_in=3600):
|
|
"Un jeton pour cet user. Stocké en base, non commité."
|
|
now = datetime.now()
|
|
if self.token and self.token_expiration > now + timedelta(seconds=60):
|
|
return self.token
|
|
self.token = base64.b64encode(os.urandom(24)).decode("utf-8")
|
|
self.token_expiration = now + timedelta(seconds=expires_in)
|
|
db.session.add(self)
|
|
return self.token
|
|
|
|
def revoke_token(self):
|
|
"Révoque le jeton de cet utilisateur"
|
|
self.token_expiration = datetime.now() - timedelta(seconds=1)
|
|
|
|
@staticmethod
|
|
def check_token(token):
|
|
"""Retreive user for given token, check token's validity
|
|
and returns the user object.
|
|
"""
|
|
user = User.query.filter_by(token=token).first()
|
|
if user is None or user.token_expiration < datetime.now():
|
|
return None
|
|
return user
|
|
|
|
def get_dept_id(self) -> int:
|
|
"returns user's department id, or None"
|
|
if self.dept:
|
|
return self._departement.id
|
|
return None
|
|
|
|
def get_emails(self):
|
|
"List mail adresses to contact this user"
|
|
mails = []
|
|
if self.email:
|
|
mails.append(self.email)
|
|
if self.email_institutionnel:
|
|
mails.append(self.email_institutionnel)
|
|
return mails
|
|
|
|
# Permissions management:
|
|
def has_permission(self, perm: int, dept: str = False):
|
|
"""Check if user has permission `perm` in given `dept` (acronym).
|
|
Similar to Zope ScoDoc7 `has_permission``
|
|
|
|
Args:
|
|
perm: integer, one of the value defined in Permission class.
|
|
dept: dept id (eg 'RT'), default to current departement.
|
|
"""
|
|
if not self.active:
|
|
return False
|
|
if dept is False:
|
|
dept = g.scodoc_dept
|
|
# les role liés à ce département, et les roles avec dept=None (super-admin)
|
|
roles_in_dept = (
|
|
UserRole.query.filter_by(user_id=self.id)
|
|
.filter(
|
|
(UserRole.dept == dept)
|
|
| (UserRole.dept == None) # pylint: disable=C0121
|
|
)
|
|
.all()
|
|
)
|
|
for user_role in roles_in_dept:
|
|
if user_role.role.has_permission(perm):
|
|
return True
|
|
return False
|
|
|
|
# Role management
|
|
def add_role(self, role: "Role", dept: str):
|
|
"""Add a role to this user.
|
|
:param role: Role to add.
|
|
"""
|
|
if not isinstance(role, Role):
|
|
raise ScoValueError("add_role: rôle invalide")
|
|
user_role = UserRole(user=self, role=role, dept=dept)
|
|
db.session.add(user_role)
|
|
self.user_roles.append(user_role)
|
|
|
|
def add_roles(self, roles: "list[Role]", dept: str):
|
|
"""Add roles to this user.
|
|
:param roles: Roles to add.
|
|
"""
|
|
for role in roles:
|
|
self.add_role(role, dept)
|
|
|
|
def set_roles(self, roles, dept):
|
|
"set roles in the given dept"
|
|
self.user_roles = []
|
|
for r in roles:
|
|
if isinstance(r, Role):
|
|
self.add_role(r, dept)
|
|
|
|
def get_roles(self):
|
|
"iterator on my roles"
|
|
for role in self.roles:
|
|
yield role
|
|
|
|
def get_roles_string(self):
|
|
"""string repr. of user's roles (with depts)
|
|
e.g. "Ens_RT, Ens_Info, Secr_CJ"
|
|
"""
|
|
return ", ".join(
|
|
f"{r.role.name or ''}_{r.dept or ''}"
|
|
for r in self.user_roles
|
|
if r is not None
|
|
)
|
|
|
|
def get_depts_with_permission(self, permission: int) -> list[str]:
|
|
"""Liste des acronymes de département dans lesquels cet utilisateur
|
|
possède la permission indiquée.
|
|
L'"acronyme" None signifie "tous les départements".
|
|
Si plusieurs permissions (plusieurs bits) sont indiquées, c'est un "ou":
|
|
les départements dans lesquels l'utilisateur a l'une des permissions.
|
|
"""
|
|
return [
|
|
user_role.dept
|
|
for user_role in UserRole.query.filter_by(user=self)
|
|
.join(Role)
|
|
.filter(Role.permissions.op("&")(permission) != 0)
|
|
]
|
|
|
|
def is_administrator(self):
|
|
"True if i'm an active SuperAdmin"
|
|
return self.active and self.has_permission(Permission.ScoSuperAdmin, dept=None)
|
|
|
|
# Some useful strings:
|
|
def get_nomplogin(self):
|
|
"""nomplogin est le nom en majuscules suivi du prénom et du login
|
|
e.g. Dupont Pierre (dupont)
|
|
"""
|
|
nom = scu.format_nom(self.nom) if self.nom else self.user_name.upper()
|
|
return f"{nom} {scu.format_prenom(self.prenom)} ({self.user_name})"
|
|
|
|
@staticmethod
|
|
def get_user_from_nomplogin(nomplogin: str) -> Optional["User"]:
|
|
"""Returns User instance from the string "Dupont Pierre (dupont)"
|
|
or None if user does not exist
|
|
"""
|
|
match = re.match(r".*\((.*)\)", nomplogin.strip())
|
|
if match:
|
|
user_name = match.group(1)
|
|
u = User.query.filter_by(user_name=user_name).first()
|
|
if u:
|
|
return u
|
|
return None
|
|
|
|
def get_nom_fmt(self):
|
|
"""Nom formaté: "Martin" """
|
|
if self.nom:
|
|
return scu.format_nom(self.nom, uppercase=False)
|
|
else:
|
|
return self.user_name
|
|
|
|
def get_prenom_fmt(self):
|
|
"""Prénom formaté (minuscule capitalisées)"""
|
|
return scu.format_prenom(self.prenom)
|
|
|
|
def get_nomprenom(self):
|
|
"""Nom capitalisé suivi de l'initiale du prénom:
|
|
Viennet E.
|
|
"""
|
|
prenom_abbrv = scu.abbrev_prenom(scu.format_prenom(self.prenom))
|
|
return (self.get_nom_fmt() + " " + prenom_abbrv).strip()
|
|
|
|
def get_prenomnom(self):
|
|
"""L'initiale du prénom suivie du nom: "J.-C. Dupont" """
|
|
prenom_abbrv = scu.abbrev_prenom(scu.format_prenom(self.prenom))
|
|
return (prenom_abbrv + " " + self.get_nom_fmt()).strip()
|
|
|
|
def get_nomcomplet(self):
|
|
"Prénom et nom complets"
|
|
return scu.format_prenom(self.prenom) + " " + self.get_nom_fmt()
|
|
|
|
# nomnoacc était le nom en minuscules sans accents (inutile)
|
|
|
|
def change_user_name(self, new_user_name: str):
|
|
"""Modify user name, update all relevant tables.
|
|
commit session.
|
|
"""
|
|
# Safety check
|
|
new_user_name = new_user_name.strip()
|
|
if (
|
|
not is_valid_user_name(new_user_name)
|
|
or User.query.filter_by(user_name=new_user_name).count() > 0
|
|
):
|
|
raise ValueError("invalid user_name")
|
|
# Le user_name est utilisé dans d'autres tables (sans être une clé)
|
|
# BulAppreciations.author
|
|
# EntrepriseHistorique.authenticated_user
|
|
# EtudAnnotation.author
|
|
# ScolarNews.authenticated_user
|
|
# Scolog.authenticated_user
|
|
from app.models import (
|
|
BulAppreciations,
|
|
EtudAnnotation,
|
|
ScolarNews,
|
|
Scolog,
|
|
)
|
|
from app.entreprises.models import EntrepriseHistorique
|
|
|
|
try:
|
|
# Update all instances of EtudAnnotation
|
|
db.session.query(BulAppreciations).filter(
|
|
BulAppreciations.author == self.user_name
|
|
).update({BulAppreciations.author: new_user_name})
|
|
db.session.query(EntrepriseHistorique).filter(
|
|
EntrepriseHistorique.authenticated_user == self.user_name
|
|
).update({EntrepriseHistorique.authenticated_user: new_user_name})
|
|
db.session.query(EtudAnnotation).filter(
|
|
EtudAnnotation.author == self.user_name
|
|
).update({EtudAnnotation.author: new_user_name})
|
|
db.session.query(ScolarNews).filter(
|
|
ScolarNews.authenticated_user == self.user_name
|
|
).update({ScolarNews.authenticated_user: new_user_name})
|
|
db.session.query(Scolog).filter(
|
|
Scolog.authenticated_user == self.user_name
|
|
).update({Scolog.authenticated_user: new_user_name})
|
|
# And update ourself:
|
|
self.user_name = new_user_name
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
except (
|
|
IntegrityError,
|
|
DataError,
|
|
DatabaseError,
|
|
OperationalError,
|
|
ProgrammingError,
|
|
StatementError,
|
|
InterfaceError,
|
|
) as exc:
|
|
db.session.rollback()
|
|
raise exc
|
|
|
|
|
|
class AnonymousUser(AnonymousUserMixin):
|
|
"Notre utilisateur anonyme"
|
|
|
|
def has_permission(self, perm, dept=None): # pylint: disable=unused-argument
|
|
"always false, anonymous has no permission"
|
|
return False
|
|
|
|
def is_administrator(self):
|
|
"always false, anonymous is not admin"
|
|
return False
|
|
|
|
|
|
login.anonymous_user = AnonymousUser
|
|
|
|
|
|
class Role(ScoDocModel):
|
|
"""Roles for ScoDoc"""
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True, nullable=False, index=True)
|
|
default = db.Column(db.Boolean, default=False, index=True)
|
|
permissions = db.Column(db.BigInteger) # 64 bits
|
|
users = db.relationship("User", secondary="user_role", viewonly=True)
|
|
|
|
def __init__(self, **kwargs):
|
|
super(Role, self).__init__(**kwargs)
|
|
if self.permissions is None:
|
|
self.permissions = 0
|
|
|
|
def __repr__(self):
|
|
return "<Role {} perm={:0{w}b}>".format(
|
|
self.name,
|
|
self.permissions & ((1 << Permission.NBITS) - 1),
|
|
w=Permission.NBITS,
|
|
)
|
|
|
|
def __str__(self):
|
|
return f"{self.name}: perm={', '.join(Permission.permissions_names(self.permissions))}"
|
|
|
|
def to_dict(self) -> dict:
|
|
"As dict. Convert permissions to names."
|
|
return {
|
|
"id": self.id,
|
|
"role_name": self.name, # pour être cohérent avec partion_name, etc.
|
|
"permissions": Permission.permissions_names(self.permissions),
|
|
}
|
|
|
|
def add_permission(self, perm: int):
|
|
"Add permission to role"
|
|
self.permissions |= perm
|
|
|
|
def remove_permission(self, perm: int):
|
|
"Remove permission from role"
|
|
self.permissions = self.permissions & ~perm
|
|
|
|
def reset_permissions(self):
|
|
"Remove all permissions from role"
|
|
self.permissions = 0
|
|
|
|
def get_named_permissions(self) -> list[str]:
|
|
"List of the names of the permissions associated to this rôle"
|
|
return Permission.permissions_names(self.permissions)
|
|
|
|
def set_named_permissions(self, permission_names: list[str]):
|
|
"""Set permissions, given as a list of permissions names.
|
|
Raises ScoValueError if invalid permission."""
|
|
self.permissions = 0
|
|
for permission_name in permission_names:
|
|
permission = Permission.get_by_name(permission_name)
|
|
if permission is None:
|
|
raise ScoValueError("set_named_permissions: invalid permission name")
|
|
self.permissions |= permission
|
|
|
|
def has_permission(self, perm: int) -> bool:
|
|
"True if role as this permission"
|
|
return self.permissions & perm == perm
|
|
|
|
@staticmethod
|
|
def reset_standard_roles_permissions(reset_permissions=True):
|
|
"""Create default roles if missing, then, if reset_permissions,
|
|
reset their permissions to default values.
|
|
"""
|
|
Role.reset_roles_permissions(
|
|
SCO_ROLES_DEFAULTS, reset_permissions=reset_permissions
|
|
)
|
|
|
|
@staticmethod
|
|
def reset_roles_permissions(roles_perms: dict[str, tuple], reset_permissions=True):
|
|
"""Ajoute les permissions aux roles
|
|
roles_perms : { "role_name" : (permission, ...) }
|
|
reset_permissions : si vrai efface permissions déja existantes
|
|
Si le role n'existe pas, il est (re) créé.
|
|
"""
|
|
default_role = "Observateur"
|
|
for role_name, permissions in roles_perms.items():
|
|
role = Role.query.filter_by(name=role_name).first()
|
|
if role is None:
|
|
role = Role(name=role_name)
|
|
role.default = role.name == default_role
|
|
db.session.add(role)
|
|
if reset_permissions:
|
|
role.reset_permissions()
|
|
for perm in permissions:
|
|
role.add_permission(perm)
|
|
db.session.add(role)
|
|
|
|
db.session.commit()
|
|
|
|
@staticmethod
|
|
def ensure_standard_roles():
|
|
"""Create default roles if missing"""
|
|
Role.reset_standard_roles_permissions(reset_permissions=False)
|
|
|
|
@staticmethod
|
|
def get_named_role(name):
|
|
"""Returns existing role with given name, or None."""
|
|
return Role.query.filter_by(name=name).first()
|
|
|
|
|
|
class UserRole(ScoDocModel):
|
|
"""Associate user to role, in a dept.
|
|
If dept is None, the role applies to all departments (eg super admin).
|
|
"""
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
|
|
role_id = db.Column(db.Integer, db.ForeignKey("role.id"))
|
|
dept = db.Column(db.String(64)) # dept acronym ou NULL
|
|
user = db.relationship(
|
|
User, backref=db.backref("user_roles", cascade="all, delete-orphan")
|
|
)
|
|
role = db.relationship(
|
|
Role, backref=db.backref("user_roles", cascade="all, delete-orphan")
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<UserRole u={self.user} r={self.role} dept={self.dept}>"
|
|
|
|
@staticmethod
|
|
def role_dept_from_string(role_dept: str):
|
|
"""Return tuple (role, dept) from the string
|
|
role_dept, of the forme "Role_Dept".
|
|
role is a Role instance, dept is a string, or None.
|
|
"""
|
|
fields = role_dept.strip().split("_", 1)
|
|
# maxsplit=1, le dept peut contenir un "_"
|
|
if len(fields) != 2:
|
|
current_app.logger.warning(
|
|
f"auth: role_dept_from_string: Invalid role_dept '{role_dept}'"
|
|
)
|
|
raise ScoValueError("Invalid role_dept")
|
|
role_name, dept = fields
|
|
dept = dept.strip() if dept else ""
|
|
if dept == "":
|
|
dept = None
|
|
|
|
role = Role.query.filter_by(name=role_name).first()
|
|
if role is None:
|
|
raise ScoValueError(f"role {role_name} does not exists")
|
|
return (role, dept)
|
|
|
|
|
|
def get_super_admin():
|
|
"""L'utilisateur admin (ou le premier, s'il y en a plusieurs).
|
|
Utilisé par les tests unitaires et le script de migration.
|
|
"""
|
|
admin_role = Role.query.filter_by(name="SuperAdmin").first()
|
|
assert admin_role
|
|
admin_user = (
|
|
User.query.join(UserRole)
|
|
.filter((UserRole.user_id == User.id) & (UserRole.role_id == admin_role.id))
|
|
.first()
|
|
)
|
|
assert admin_user
|
|
return admin_user
|
|
|
|
|
|
def send_notif_desactivation_user(user: User):
|
|
"""Envoi un message mail de notification à l'admin et à l'adresse du compte désactivé"""
|
|
recipients = user.get_emails() + [current_app.config.get("SCODOC_ADMIN_MAIL")]
|
|
txt = [
|
|
f"""Le compte ScoDoc '{user.user_name}' associé à votre adresse <{user.email}>""",
|
|
"""a été désactivé par le système car son mot de passe n'était pas valide.\n""",
|
|
"""Contactez votre responsable pour le ré-activer.\n""",
|
|
"""Ceci est un message automatique, ne pas répondre.""",
|
|
]
|
|
txt = "\n".join(txt)
|
|
email.send_email(
|
|
f"ScoDoc: désactivation automatique du compte {user.user_name}",
|
|
email.get_from_addr(),
|
|
recipients,
|
|
txt,
|
|
)
|
|
return txt
|