# -*- coding: UTF-8 -* """Users and Roles models for ScoDoc """ import base64 from datetime import datetime, timedelta import os import re from time import time from typing import Optional import cracklib # pylint: disable=import-error from flask import current_app, g from flask_login import UserMixin, AnonymousUserMixin from sqlalchemy.exc import ( IntegrityError, DataError, DatabaseError, OperationalError, ProgrammingError, StatementError, InterfaceError, ) from werkzeug.security import generate_password_hash, check_password_hash import jwt from app import db, email, log, login from app.models import Departement, ScoDocModel from app.models import SHORT_STR_LEN, USERNAME_STR_LEN from app.models.config import ScoDocSiteConfig from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_permissions import Permission from app.scodoc.sco_roles_default import SCO_ROLES_DEFAULTS import app.scodoc.sco_utils as scu VALID_LOGIN_EXP = re.compile(r"^[a-zA-Z0-9@\\\-_\.]+$") def is_valid_password(cleartxt) -> bool: """Check password. returns True if OK. """ if ( hasattr(scu.CONFIG, "MIN_PASSWORD_LENGTH") and scu.CONFIG.MIN_PASSWORD_LENGTH > 0 and len(cleartxt) < scu.CONFIG.MIN_PASSWORD_LENGTH ): return False # invalid: too short try: _ = cracklib.FascistCheck(cleartxt) return True except ValueError: return False def is_valid_user_name(user_name: str) -> bool: "Check that user_name (aka login) is valid" return ( user_name and (len(user_name) >= 2) and (len(user_name) < USERNAME_STR_LEN) and VALID_LOGIN_EXP.match(user_name) ) class User(UserMixin, ScoDocModel): """ScoDoc users, handled by Flask / SQLAlchemy""" id = db.Column(db.Integer, primary_key=True) user_name = db.Column(db.String(USERNAME_STR_LEN), index=True, unique=True) "le login" email = db.Column(db.String(120)) "email à utiliser par ScoDoc" email_institutionnel = db.Column(db.String(120)) "email dans l'établissement, facultatif" nom = db.Column(db.String(USERNAME_STR_LEN)) prenom = db.Column(db.String(USERNAME_STR_LEN)) dept = db.Column(db.String(SHORT_STR_LEN), index=True) "acronyme du département de l'utilisateur" active = db.Column(db.Boolean, default=True, index=True) "si faux, compte utilisateur désactivé" cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True) "uid sur le CAS (id, mail ou autre attribut, selon config.cas_attribute_id)" cas_allow_login = db.Column( db.Boolean, default=False, server_default="false", nullable=False ) "Peut-on se logguer via le CAS ?" cas_allow_scodoc_login = db.Column( db.Boolean, default=False, server_default="false", nullable=False ) """Si CAS forcé (cas_force), peut-on se logguer sur ScoDoc directement ? (le rôle ScoSuperAdmin peut toujours, mettre à True pour les utilisateur API) """ cas_last_login = db.Column(db.DateTime, nullable=True) """date du dernier login via CAS""" edt_id = db.Column(db.Text(), index=True, nullable=True) "identifiant emplois du temps (unicité non imposée)" password_hash = db.Column(db.Text()) # les hashs modernes peuvent être très longs password_scodoc7 = db.Column(db.String(42)) last_seen = db.Column(db.DateTime, default=datetime.utcnow) date_modif_passwd = db.Column(db.DateTime, default=datetime.utcnow) date_created = db.Column(db.DateTime, default=datetime.utcnow) date_expiration = db.Column(db.DateTime, default=None) passwd_temp = db.Column(db.Boolean, default=False) """champ obsolete. Si connexion alors que passwd_temp est vrai, efface mot de passe et redirige vers accueil.""" token = db.Column(db.Text(), index=True, unique=True) token_expiration = db.Column(db.DateTime) # Define the back reference from User to ModuleImpl modimpls = db.relationship("ModuleImpl", back_populates="responsable") roles = db.relationship("Role", secondary="user_role", viewonly=True) Permission = Permission _departement = db.relationship( "Departement", foreign_keys=[Departement.acronym], primaryjoin=(dept == Departement.acronym), lazy="select", passive_deletes="all", uselist=False, ) def __init__(self, **kwargs): "user_name:str is mandatory" self.roles = [] self.user_roles = [] # check login: if not "user_name" in kwargs: raise ValueError("missing user_name argument") if not is_valid_user_name(kwargs["user_name"]): raise ValueError(f"invalid user_name: {kwargs['user_name']}") kwargs["nom"] = kwargs.get("nom", "") or "" kwargs["prenom"] = kwargs.get("prenom", "") or "" super().__init__(**kwargs) # Ajoute roles: if ( not self.roles and self.email and self.email == current_app.config["SCODOC_ADMIN_MAIL"] ): # super-admin admin_role = Role.query.filter_by(name="SuperAdmin").first() assert admin_role self.add_role(admin_role, None) db.session.commit() # current_app.logger.info("creating user with roles={}".format(self.roles)) def __repr__(self): return f"""""" def __str__(self): return self.user_name @classmethod def get_user(cls, user_id: int | str, accept_none=False): """Get user by id, user_name or User instance, ou 404 (ou None si accept_none) If user_id == -1, returns None (without exception) """ query = None if isinstance(user_id, str): query = db.session.query(cls).filter_by(user_name=user_id) elif isinstance(user_id, int): if user_id == -1: return None query = db.session.query(cls).filter_by(id=user_id) elif isinstance(user_id, User): return user_id else: raise ValueError("invalid user_id") return query.first_or_404() if not accept_none else query.first() def set_password(self, password): "Set password" current_app.logger.info(f"set_password({self})") if password: self.password_hash = generate_password_hash(password) else: self.password_hash = None # La création d'un mot de passe efface l'éventuel mot de passe historique self.password_scodoc7 = None self.passwd_temp = False def check_password(self, password: str) -> bool: """Check given password vs current one. Returns `True` if the password matched, `False` otherwise. """ if not self.active: # inactived users can't login current_app.logger.warning( f"auth: login attempt from inactive account {self}" ) return False if self.passwd_temp: # Anciens comptes ScoDoc 7 non migrés # désactive le compte par sécurité. current_app.logger.warning(f"auth: desactivating legacy account {self}") self.active = False self.passwd_temp = True db.session.add(self) db.session.commit() send_notif_desactivation_user(self) return False # if CAS activated and forced, allow only super-user and users with cas_allow_scodoc_login cas_enabled = ScoDocSiteConfig.is_cas_enabled() if cas_enabled and ScoDocSiteConfig.get("cas_force"): if (not self.is_administrator()) and not self.cas_allow_scodoc_login: return False if not self.password_hash: # user without password can't login if self.password_scodoc7: # Special case: user freshly migrated from ScoDoc7 return self._migrate_scodoc7_password(password) return False return check_password_hash(self.password_hash, password) def _migrate_scodoc7_password(self, password) -> bool: """After migration, rehash password.""" if scu.check_scodoc7_password(self.password_scodoc7, password): current_app.logger.warning( f"auth: migrating legacy ScoDoc7 password for {self}" ) self.set_password(password) self.password_scodoc7 = None db.session.add(self) db.session.commit() return True return False def get_reset_password_token(self, expires_in=600): "Un token pour réinitialiser son mot de passe" return jwt.encode( {"reset_password": self.id, "exp": time() + expires_in}, current_app.config["SECRET_KEY"], algorithm="HS256", ) @staticmethod def verify_reset_password_token(token): "Vérification du token de ré-initialisation du mot de passe" try: token = jwt.decode( token, current_app.config["SECRET_KEY"], algorithms=["HS256"] ) except jwt.exceptions.ExpiredSignatureError: log("verify_reset_password_token: token expired") except: return None try: user_id = token["reset_password"] # double check en principe inutile car déjà fait dans decode() expire = float(token["exp"]) if time() > expire: log(f"verify_reset_password_token: token expired for uid={user_id}") return None except (TypeError, KeyError): return None return db.session.get(User, user_id) def sort_key(self) -> tuple: "sort key" return ( (self.nom or "").upper(), (self.prenom or "").upper(), (self.user_name or "").upper(), ) def to_dict(self, include_email=True): """l'utilisateur comme un dict, avec des champs supplémentaires""" data = { "date_expiration": ( self.date_expiration.isoformat() + "Z" if self.date_expiration else None ), "date_modif_passwd": ( self.date_modif_passwd.isoformat() + "Z" if self.date_modif_passwd else None ), "date_created": ( self.date_created.isoformat() + "Z" if self.date_created else None ), "dept": self.dept, "id": self.id, "active": self.active, "cas_id": self.cas_id, "cas_allow_login": self.cas_allow_login, "cas_allow_scodoc_login": self.cas_allow_scodoc_login, "cas_last_login": ( self.cas_last_login.isoformat() + "Z" if self.cas_last_login else None ), "edt_id": self.edt_id, "status_txt": "actif" if self.active else "fermé", "last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None, "nom": self.nom or "", "prenom": self.prenom or "", "roles_string": self.get_roles_string(), # eg "Ens_RT, Ens_Info" "user_name": self.user_name, # Les champs calculés: "nom_fmt": self.get_nom_fmt(), "prenom_fmt": self.get_prenom_fmt(), "nomprenom": self.get_nomprenom(), "prenomnom": self.get_prenomnom(), "nomplogin": self.get_nomplogin(), "nomcomplet": self.get_nomcomplet(), } if include_email: data["email"] = self.email or "" data["email_institutionnel"] = self.email_institutionnel or "" return data @classmethod def convert_dict_fields(cls, args: dict) -> dict: """Convert fields in the given dict. No other side effect. args: dict with args in application. returns: dict to store in model's db. Convert boolean values to bools. """ args_dict = args # Dates if "date_expiration" in args: date_expiration = args.get("date_expiration") if isinstance(date_expiration, str): args["date_expiration"] = ( datetime.datetime.fromisoformat(date_expiration) if date_expiration else None ) # booléens: for field in ("active", "cas_allow_login", "cas_allow_scodoc_login"): if field in args: args_dict[field] = scu.to_bool(args.get(field)) # chaines ne devant pas être NULLs for field in ("nom", "prenom"): if field in args: args[field] = args[field] or "" # chaines ne devant pas être vides mais au contraire null (unicité) if "cas_id" in args: args["cas_id"] = args["cas_id"] or None return args_dict def from_dict(self, data: dict, new_user=False): """Set users' attributes from given dict values. - roles_string : roles, encoded like "Ens_RT, Secr_CJ" - date_expiration is a dateime object. Does not check permissions here. """ if new_user: if "user_name" in data: # never change name of existing users # (see change_user_name method to do that) if not is_valid_user_name(data["user_name"]): raise ValueError(f"invalid user_name: {data['user_name']}") self.user_name = data["user_name"] if "password" in data: self.set_password(data["password"]) # Roles: roles_string is "Ens_RT, Secr_RT, ..." if "roles_string" in data: self.user_roles = [] for r_d in data["roles_string"].split(","): if r_d: role, dept = UserRole.role_dept_from_string(r_d) self.add_role(role, dept) super().from_dict(data, excluded={"user_name", "roles_string", "roles"}) if ScoDocSiteConfig.cas_uid_use_scodoc(): self.cas_id = self.user_name else: # Set cas_id using regexp if configured: exp = ScoDocSiteConfig.get("cas_uid_from_mail_regexp") if exp and self.email_institutionnel: cas_id = ScoDocSiteConfig.extract_cas_id(self.email_institutionnel) if cas_id: self.cas_id = cas_id def get_token(self, expires_in=3600): "Un jeton pour cet user. Stocké en base, non commité." now = datetime.utcnow() if self.token and self.token_expiration > now + timedelta(seconds=60): return self.token self.token = base64.b64encode(os.urandom(24)).decode("utf-8") self.token_expiration = now + timedelta(seconds=expires_in) db.session.add(self) return self.token def revoke_token(self): "Révoque le jeton de cet utilisateur" self.token_expiration = datetime.utcnow() - timedelta(seconds=1) @staticmethod def check_token(token): """Retreive user for given token, check token's validity and returns the user object. """ user = User.query.filter_by(token=token).first() if user is None or user.token_expiration < datetime.utcnow(): return None return user def get_dept_id(self) -> int: "returns user's department id, or None" if self.dept: return self._departement.id return None def get_emails(self): "List mail adresses to contact this user" mails = [] if self.email: mails.append(self.email) if self.email_institutionnel: mails.append(self.email_institutionnel) return mails # Permissions management: def has_permission(self, perm: int, dept: str = False): """Check if user has permission `perm` in given `dept` (acronym). Similar to Zope ScoDoc7 `has_permission`` Args: perm: integer, one of the value defined in Permission class. dept: dept id (eg 'RT'), default to current departement. """ if not self.active: return False if dept is False: dept = g.scodoc_dept # les role liés à ce département, et les roles avec dept=None (super-admin) roles_in_dept = ( UserRole.query.filter_by(user_id=self.id) .filter((UserRole.dept == dept) | (UserRole.dept == None)) .all() ) for user_role in roles_in_dept: if user_role.role.has_permission(perm): return True return False # Role management def add_role(self, role: "Role", dept: str): """Add a role to this user. :param role: Role to add. """ if not isinstance(role, Role): raise ScoValueError("add_role: rôle invalide") user_role = UserRole(user=self, role=role, dept=dept) db.session.add(user_role) self.user_roles.append(user_role) def add_roles(self, roles: "list[Role]", dept: str): """Add roles to this user. :param roles: Roles to add. """ for role in roles: self.add_role(role, dept) def set_roles(self, roles, dept): "set roles in the given dept" self.user_roles = [] for r in roles: if isinstance(r, Role): self.add_role(r, dept) def get_roles(self): "iterator on my roles" for role in self.roles: yield role def get_roles_string(self): """string repr. of user's roles (with depts) e.g. "Ens_RT, Ens_Info, Secr_CJ" """ return ", ".join( f"{r.role.name or ''}_{r.dept or ''}" for r in self.user_roles if r is not None ) def get_depts_with_permission(self, permission: int) -> list[str]: """Liste des acronymes de département dans lesquels cet utilisateur possède la permission indiquée. L'"acronyme" None signifie "tous les départements". Si plusieurs permissions (plusieurs bits) sont indiquées, c'est un "ou": les départements dans lesquels l'utilisateur a l'une des permissions. """ return [ user_role.dept for user_role in UserRole.query.filter_by(user=self) .join(Role) .filter(Role.permissions.op("&")(permission) != 0) ] def is_administrator(self): "True if i'm an active SuperAdmin" return self.active and self.has_permission(Permission.ScoSuperAdmin, dept=None) # Some useful strings: def get_nomplogin(self): """nomplogin est le nom en majuscules suivi du prénom et du login e.g. Dupont Pierre (dupont) """ nom = scu.format_nom(self.nom) if self.nom else self.user_name.upper() return f"{nom} {scu.format_prenom(self.prenom)} ({self.user_name})" @staticmethod def get_user_from_nomplogin(nomplogin: str) -> Optional["User"]: """Returns User instance from the string "Dupont Pierre (dupont)" or None if user does not exist """ match = re.match(r".*\((.*)\)", nomplogin.strip()) if match: user_name = match.group(1) u = User.query.filter_by(user_name=user_name).first() if u: return u return None def get_nom_fmt(self): """Nom formaté: "Martin" """ if self.nom: return scu.format_nom(self.nom, uppercase=False) else: return self.user_name def get_prenom_fmt(self): """Prénom formaté (minuscule capitalisées)""" return scu.format_prenom(self.prenom) def get_nomprenom(self): """Nom capitalisé suivi de l'initiale du prénom: Viennet E. """ prenom_abbrv = scu.abbrev_prenom(scu.format_prenom(self.prenom)) return (self.get_nom_fmt() + " " + prenom_abbrv).strip() def get_prenomnom(self): """L'initiale du prénom suivie du nom: "J.-C. Dupont" """ prenom_abbrv = scu.abbrev_prenom(scu.format_prenom(self.prenom)) return (prenom_abbrv + " " + self.get_nom_fmt()).strip() def get_nomcomplet(self): "Prénom et nom complets" return scu.format_prenom(self.prenom) + " " + self.get_nom_fmt() # nomnoacc était le nom en minuscules sans accents (inutile) def change_user_name(self, new_user_name: str): """Modify user name, update all relevant tables. commit session. """ # Safety check new_user_name = new_user_name.strip() if ( not is_valid_user_name(new_user_name) or User.query.filter_by(user_name=new_user_name).count() > 0 ): raise ValueError("invalid user_name") # Le user_name est utilisé dans d'autres tables (sans être une clé) # BulAppreciations.author # EntrepriseHistorique.authenticated_user # EtudAnnotation.author # ScolarNews.authenticated_user # Scolog.authenticated_user from app.models import ( BulAppreciations, EtudAnnotation, ScolarNews, Scolog, ) from app.entreprises.models import EntrepriseHistorique try: # Update all instances of EtudAnnotation db.session.query(BulAppreciations).filter( BulAppreciations.author == self.user_name ).update({BulAppreciations.author: new_user_name}) db.session.query(EntrepriseHistorique).filter( EntrepriseHistorique.authenticated_user == self.user_name ).update({EntrepriseHistorique.authenticated_user: new_user_name}) db.session.query(EtudAnnotation).filter( EtudAnnotation.author == self.user_name ).update({EtudAnnotation.author: new_user_name}) db.session.query(ScolarNews).filter( ScolarNews.authenticated_user == self.user_name ).update({ScolarNews.authenticated_user: new_user_name}) db.session.query(Scolog).filter( Scolog.authenticated_user == self.user_name ).update({Scolog.authenticated_user: new_user_name}) # And update ourself: self.user_name = new_user_name db.session.add(self) db.session.commit() except ( IntegrityError, DataError, DatabaseError, OperationalError, ProgrammingError, StatementError, InterfaceError, ) as exc: db.session.rollback() raise exc class AnonymousUser(AnonymousUserMixin): "Notre utilisateur anonyme" def has_permission(self, perm, dept=None): return False def is_administrator(self): return False login.anonymous_user = AnonymousUser class Role(db.Model): """Roles for ScoDoc""" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True, nullable=False, index=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.BigInteger) # 64 bits users = db.relationship("User", secondary="user_role", viewonly=True) def __init__(self, **kwargs): super(Role, self).__init__(**kwargs) if self.permissions is None: self.permissions = 0 def __repr__(self): return "".format( self.name, self.permissions & ((1 << Permission.NBITS) - 1), w=Permission.NBITS, ) def __str__(self): return f"{self.name}: perm={', '.join(Permission.permissions_names(self.permissions))}" def to_dict(self) -> dict: "As dict. Convert permissions to names." return { "id": self.id, "role_name": self.name, # pour être cohérent avec partion_name, etc. "permissions": Permission.permissions_names(self.permissions), } def add_permission(self, perm: int): "Add permission to role" self.permissions |= perm def remove_permission(self, perm: int): "Remove permission from role" self.permissions = self.permissions & ~perm def reset_permissions(self): "Remove all permissions from role" self.permissions = 0 def get_named_permissions(self) -> list[str]: "List of the names of the permissions associated to this rôle" return Permission.permissions_names(self.permissions) def set_named_permissions(self, permission_names: list[str]): """Set permissions, given as a list of permissions names. Raises ScoValueError if invalid permission.""" self.permissions = 0 for permission_name in permission_names: permission = Permission.get_by_name(permission_name) if permission is None: raise ScoValueError("set_named_permissions: invalid permission name") self.permissions |= permission def has_permission(self, perm: int) -> bool: "True if role as this permission" return self.permissions & perm == perm @staticmethod def reset_standard_roles_permissions(reset_permissions=True): """Create default roles if missing, then, if reset_permissions, reset their permissions to default values. """ Role.reset_roles_permissions( SCO_ROLES_DEFAULTS, reset_permissions=reset_permissions ) @staticmethod def reset_roles_permissions(roles_perms: dict[str, tuple], reset_permissions=True): """Ajoute les permissions aux roles roles_perms : { "role_name" : (permission, ...) } reset_permissions : si vrai efface permissions déja existantes Si le role n'existe pas, il est (re) créé. """ default_role = "Observateur" for role_name, permissions in roles_perms.items(): role = Role.query.filter_by(name=role_name).first() if role is None: role = Role(name=role_name) role.default = role.name == default_role db.session.add(role) if reset_permissions: role.reset_permissions() for perm in permissions: role.add_permission(perm) db.session.add(role) db.session.commit() @staticmethod def ensure_standard_roles(): """Create default roles if missing""" Role.reset_standard_roles_permissions(reset_permissions=False) @staticmethod def get_named_role(name): """Returns existing role with given name, or None.""" return Role.query.filter_by(name=name).first() class UserRole(db.Model): """Associate user to role, in a dept. If dept is None, the role applies to all departments (eg super admin). """ id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("user.id")) role_id = db.Column(db.Integer, db.ForeignKey("role.id")) dept = db.Column(db.String(64)) # dept acronym ou NULL user = db.relationship( User, backref=db.backref("user_roles", cascade="all, delete-orphan") ) role = db.relationship( Role, backref=db.backref("user_roles", cascade="all, delete-orphan") ) def __repr__(self): return f"" @staticmethod def role_dept_from_string(role_dept: str): """Return tuple (role, dept) from the string role_dept, of the forme "Role_Dept". role is a Role instance, dept is a string, or None. """ fields = role_dept.strip().split("_", 1) # maxsplit=1, le dept peut contenir un "_" if len(fields) != 2: current_app.logger.warning( f"auth: role_dept_from_string: Invalid role_dept '{role_dept}'" ) raise ScoValueError("Invalid role_dept") role_name, dept = fields dept = dept.strip() if dept else "" if dept == "": dept = None role = Role.query.filter_by(name=role_name).first() if role is None: raise ScoValueError(f"role {role_name} does not exists") return (role, dept) def get_super_admin(): """L'utilisateur admin (ou le premier, s'il y en a plusieurs). Utilisé par les tests unitaires et le script de migration. """ admin_role = Role.query.filter_by(name="SuperAdmin").first() assert admin_role admin_user = ( User.query.join(UserRole) .filter((UserRole.user_id == User.id) & (UserRole.role_id == admin_role.id)) .first() ) assert admin_user return admin_user def send_notif_desactivation_user(user: User): """Envoi un message mail de notification à l'admin et à l'adresse du compte désactivé""" recipients = user.get_emails() + [current_app.config.get("SCODOC_ADMIN_MAIL")] txt = [ f"""Le compte ScoDoc '{user.user_name}' associé à votre adresse <{user.email}>""", """a été désactivé par le système car son mot de passe n'était pas valide.\n""", """Contactez votre responsable pour le ré-activer.\n""", """Ceci est un message automatique, ne pas répondre.""", ] txt = "\n".join(txt) email.send_email( f"ScoDoc: désactivation automatique du compte {user.user_name}", email.get_from_addr(), recipients, txt, ) return txt