diff --git a/app/auth/models.py b/app/auth/models.py index eb278dd13..7e9642ddf 100644 --- a/app/auth/models.py +++ b/app/auth/models.py @@ -14,6 +14,15 @@ import cracklib # pylint: disable=import-error from flask import current_app, g from flask_login import UserMixin, AnonymousUserMixin +from sqlalchemy.exc import ( + IntegrityError, + DataError, + DatabaseError, + OperationalError, + ProgrammingError, + StatementError, + InterfaceError, +) from werkzeug.security import generate_password_hash, check_password_hash @@ -48,13 +57,13 @@ def is_valid_password(cleartxt) -> bool: return False -def invalid_user_name(user_name: str) -> bool: - "Check that user_name (aka login) is invalid" +def is_valid_user_name(user_name: str) -> bool: + "Check that user_name (aka login) is valid" return ( - not user_name - or (len(user_name) < 2) - or (len(user_name) >= USERNAME_STR_LEN) - or not VALID_LOGIN_EXP.match(user_name) + user_name + and (len(user_name) >= 2) + and (len(user_name) < USERNAME_STR_LEN) + and VALID_LOGIN_EXP.match(user_name) ) @@ -123,7 +132,7 @@ class User(UserMixin, ScoDocModel): # check login: if not "user_name" in kwargs: raise ValueError("missing user_name argument") - if invalid_user_name(kwargs["user_name"]): + if not is_valid_user_name(kwargs["user_name"]): raise ValueError(f"invalid user_name: {kwargs['user_name']}") kwargs["nom"] = kwargs.get("nom", "") or "" kwargs["prenom"] = kwargs.get("prenom", "") or "" @@ -329,7 +338,8 @@ class User(UserMixin, ScoDocModel): if new_user: if "user_name" in data: # never change name of existing users - if invalid_user_name(data["user_name"]): + # (see change_user_name method to do that) + if not is_valid_user_name(data["user_name"]): raise ValueError(f"invalid user_name: {data['user_name']}") self.user_name = data["user_name"] if "password" in data: @@ -522,6 +532,64 @@ class User(UserMixin, ScoDocModel): # nomnoacc était le nom en minuscules sans accents (inutile) + def change_user_name(self, new_user_name: str): + """Modify user name, update all relevant tables. + commit session. + """ + # Safety check + new_user_name = new_user_name.strip() + if ( + not is_valid_user_name(new_user_name) + or User.query.filter_by(user_name=new_user_name).count() > 0 + ): + raise ValueError("invalid user_name") + # Le user_name est utilisé dans d'autres tables (sans être une clé) + # BulAppreciations.author + # EntrepriseHistorique.authenticated_user + # EtudAnnotation.author + # ScolarNews.authenticated_user + # Scolog.authenticated_user + from app.models import ( + BulAppreciations, + EtudAnnotation, + ScolarNews, + Scolog, + ) + from app.entreprises.models import EntrepriseHistorique + + try: + # Update all instances of EtudAnnotation + db.session.query(BulAppreciations).filter( + BulAppreciations.author == self.user_name + ).update({BulAppreciations.author: new_user_name}) + db.session.query(EntrepriseHistorique).filter( + EntrepriseHistorique.authenticated_user == self.user_name + ).update({EntrepriseHistorique.authenticated_user: new_user_name}) + db.session.query(EtudAnnotation).filter( + EtudAnnotation.author == self.user_name + ).update({EtudAnnotation.author: new_user_name}) + db.session.query(ScolarNews).filter( + ScolarNews.authenticated_user == self.user_name + ).update({ScolarNews.authenticated_user: new_user_name}) + db.session.query(Scolog).filter( + Scolog.authenticated_user == self.user_name + ).update({Scolog.authenticated_user: new_user_name}) + # And update ourself: + self.user_name = new_user_name + db.session.add(self) + db.session.commit() + except ( + IntegrityError, + DataError, + DatabaseError, + OperationalError, + ProgrammingError, + StatementError, + InterfaceError, + ) as exc: + db.session.rollback() + raise exc + class AnonymousUser(AnonymousUserMixin): "Notre utilisateur anonyme" diff --git a/app/auth/routes.py b/app/auth/routes.py index c9f5f7a0a..778cf8e5e 100644 --- a/app/auth/routes.py +++ b/app/auth/routes.py @@ -18,7 +18,7 @@ from app.auth.forms import ( ResetPasswordRequestForm, UserCreationForm, ) -from app.auth.models import Role, User, invalid_user_name +from app.auth.models import Role, User, is_valid_user_name from app.auth.email import send_password_reset_email from app.decorators import admin_required from app.forms.generic import SimpleConfirmationForm @@ -35,10 +35,12 @@ def _login_form(): form = LoginForm() if form.validate_on_submit(): # note: ceci est la première requête SQL déclenchée par un utilisateur arrivant - if invalid_user_name(form.user_name.data): - user = None - else: - user = User.query.filter_by(user_name=form.user_name.data).first() + user = ( + User.query.filter_by(user_name=form.user_name.data).first() + if is_valid_user_name(form.user_name.data) + else None + ) + if user is None or not user.check_password(form.password.data): current_app.logger.info("login: invalid (%s)", form.user_name.data) flash(_("Nom ou mot de passe invalide")) diff --git a/app/entreprises/models.py b/app/entreprises/models.py index 2dc825b82..b41d6b5ce 100644 --- a/app/entreprises/models.py +++ b/app/entreprises/models.py @@ -151,7 +151,7 @@ class EntrepriseHistorique(db.Model): __tablename__ = "are_historique" id = db.Column(db.Integer, primary_key=True) date = db.Column(db.DateTime(timezone=True), server_default=db.func.now()) - authenticated_user = db.Column(db.Text) + authenticated_user = db.Column(db.Text) # user_name login sans contrainte entreprise_id = db.Column(db.Integer) object = db.Column(db.Text) object_id = db.Column(db.Integer) diff --git a/app/models/events.py b/app/models/events.py index f8fd64ceb..659bf2600 100644 --- a/app/models/events.py +++ b/app/models/events.py @@ -27,7 +27,7 @@ class Scolog(db.Model): method = db.Column(db.Text) msg = db.Column(db.Text) etudid = db.Column(db.Integer) # sans contrainte pour garder logs après suppression - authenticated_user = db.Column(db.Text) # login, sans contrainte + authenticated_user = db.Column(db.Text) # user_name login, sans contrainte # zope_remote_addr suppressed @classmethod @@ -76,7 +76,9 @@ class ScolarNews(db.Model): date = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), index=True ) - authenticated_user = db.Column(db.Text, index=True) # login, sans contrainte + authenticated_user = db.Column( + db.Text, index=True + ) # user_name login, sans contrainte # type in 'INSCR', 'NOTES', 'FORM', 'SEM', 'MISC' type = db.Column(db.String(SHORT_STR_LEN), index=True) object = db.Column( diff --git a/sco_version.py b/sco_version.py index 0fcfea845..336790981 100644 --- a/sco_version.py +++ b/sco_version.py @@ -1,7 +1,7 @@ # -*- mode: python -*- # -*- coding: utf-8 -*- -SCOVERSION = "9.6.979" +SCOVERSION = "9.6.980" SCONAME = "ScoDoc" diff --git a/scodoc.py b/scodoc.py index 1b5b8eccf..14e8b2425 100755 --- a/scodoc.py +++ b/scodoc.py @@ -385,6 +385,18 @@ def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name= db.session.commit() +@app.cli.command() +@click.argument("user_name") +@click.argument("new_user_name") +def user_change_login(user_name, new_user_name): + """Change user's login (user_name)""" + user: User = User.query.filter_by(user_name=user_name).first() + if not user: + sys.stderr.write(f"user_change_login: user {user_name} does not exists\n") + return 1 + user.change_user_name(new_user_name) + + def abort_if_false(ctx, param, value): if not value: ctx.abort() diff --git a/tools/anonymize_db.py b/tools/anonymize_db.py index 494c61a00..2c416038a 100755 --- a/tools/anonymize_db.py +++ b/tools/anonymize_db.py @@ -185,6 +185,7 @@ def anonymize_users(cursor): ) # Change les username: utilisés en référence externe # dans diverses tables: + # NB: la méthode User.change_user_name fait la même chose for table, field in ( ("etud_annotations", "author"), ("scolog", "authenticated_user"),