265 lines
8.2 KiB
Python
265 lines
8.2 KiB
Python
# -*- coding: UTF-8 -*
|
|
|
|
"""Users and Roles models for ScoDoc
|
|
"""
|
|
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from hashlib import md5
|
|
import json
|
|
import os
|
|
from time import time
|
|
|
|
from flask import current_app, url_for, g
|
|
from flask_login import UserMixin, AnonymousUserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
import jwt
|
|
|
|
from app import db, login
|
|
|
|
from app.scodoc.sco_permissions import Permission
|
|
from app.scodoc.sco_roles_default import SCO_ROLES_DEFAULTS
|
|
|
|
|
|
class User(UserMixin, db.Model):
|
|
"""ScoDoc users, handled by Flask / SQLAlchemy"""
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(64), index=True, unique=True)
|
|
email = db.Column(db.String(120), index=True, unique=True)
|
|
password_hash = db.Column(db.String(128))
|
|
about_me = db.Column(db.String(140))
|
|
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
|
|
token = db.Column(db.String(32), index=True, unique=True)
|
|
token_expiration = db.Column(db.DateTime)
|
|
roles = db.relationship("Role", secondary="user_role", viewonly=True)
|
|
Permission = Permission
|
|
|
|
def __init__(self, **kwargs):
|
|
self.roles = []
|
|
super(User, self).__init__(**kwargs)
|
|
if (
|
|
not self.roles
|
|
and self.email
|
|
and self.email == current_app.config["SCODOC_ADMIN_MAIL"]
|
|
):
|
|
# super-admin
|
|
admin_role = Role.query.filter_by(name="Admin").first()
|
|
assert admin_role
|
|
self.add_role(admin_role, None)
|
|
db.session.commit()
|
|
current_app.logger.info("creating user with roles={}".format(self.roles))
|
|
|
|
def __repr__(self):
|
|
return "<User {}>".format(self.username)
|
|
|
|
def __str__(self):
|
|
return self.username
|
|
|
|
def set_password(self, password):
|
|
"Set password"
|
|
if password:
|
|
self.password_hash = generate_password_hash(password)
|
|
else:
|
|
self.password_hash = None
|
|
|
|
def check_password(self, password):
|
|
"""Check given password vs current one.
|
|
Returns `True` if the password matched, `False` otherwise.
|
|
"""
|
|
if not self.password_hash: # user without password can't login
|
|
return False
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def get_reset_password_token(self, expires_in=600):
|
|
return jwt.encode(
|
|
{"reset_password": self.id, "exp": time() + expires_in},
|
|
current_app.config["SECRET_KEY"],
|
|
algorithm="HS256",
|
|
).decode("utf-8")
|
|
|
|
@staticmethod
|
|
def verify_reset_password_token(token):
|
|
try:
|
|
id = jwt.decode(
|
|
token, current_app.config["SECRET_KEY"], algorithms=["HS256"]
|
|
)["reset_password"]
|
|
except:
|
|
return
|
|
return User.query.get(id)
|
|
|
|
def to_dict(self, include_email=False):
|
|
data = {
|
|
"id": self.id,
|
|
"username": self.username,
|
|
"last_seen": self.last_seen.isoformat() + "Z",
|
|
"about_me": self.about_me,
|
|
}
|
|
if include_email:
|
|
data["email"] = self.email
|
|
return data
|
|
|
|
def from_dict(self, data, new_user=False):
|
|
for field in ["username", "email", "about_me"]:
|
|
if field in data:
|
|
setattr(self, field, data[field])
|
|
if new_user and "password" in data:
|
|
self.set_password(data["password"])
|
|
|
|
def get_token(self, expires_in=3600):
|
|
now = datetime.utcnow()
|
|
if self.token and self.token_expiration > now + timedelta(seconds=60):
|
|
return self.token
|
|
self.token = base64.b64encode(os.urandom(24)).decode("utf-8")
|
|
self.token_expiration = now + timedelta(seconds=expires_in)
|
|
db.session.add(self)
|
|
return self.token
|
|
|
|
def revoke_token(self):
|
|
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
|
|
|
|
@staticmethod
|
|
def check_token(token):
|
|
user = User.query.filter_by(token=token).first()
|
|
if user is None or user.token_expiration < datetime.utcnow():
|
|
return None
|
|
return user
|
|
|
|
# Permissions management:
|
|
def has_permission(self, perm, dept=False):
|
|
"""Check if user has permission `perm` in given `dept`.
|
|
Emulate Zope `has_permission``
|
|
|
|
Args:
|
|
perm: integer, one of the value defined in Permission class.
|
|
dept: dept id (eg 'RT')
|
|
"""
|
|
if dept is False:
|
|
dept = g.scodoc_dept
|
|
# les role liés à ce département, et les roles avec dept=None (super-admin)
|
|
roles_in_dept = (
|
|
UserRole.query.filter_by(user_id=self.id)
|
|
.filter((UserRole.dept == dept) | (UserRole.dept == None))
|
|
.all()
|
|
)
|
|
for user_role in roles_in_dept:
|
|
if user_role.role.has_permission(perm):
|
|
return True
|
|
return False
|
|
|
|
# Role management
|
|
def add_role(self, role, dept):
|
|
"""Add a role to this user.
|
|
:param role: Role to add.
|
|
"""
|
|
self.user_roles.append(UserRole(user=self, role=role, dept=dept))
|
|
|
|
def add_roles(self, roles, dept):
|
|
"""Add roles to this user.
|
|
:param roles: Roles to add.
|
|
"""
|
|
for role in roles:
|
|
self.add_role(role, dept)
|
|
|
|
def set_roles(self, roles, dept):
|
|
self.user_roles = [UserRole(user=self, role=r, dept=dept) for r in roles]
|
|
|
|
def get_roles(self):
|
|
for role in self.roles:
|
|
yield role
|
|
|
|
def is_administrator(self):
|
|
return self.has_permission(Permission.ScoSuperAdmin, None)
|
|
|
|
|
|
class AnonymousUser(AnonymousUserMixin):
|
|
def has_permission(self, perm, dept=None):
|
|
return False
|
|
|
|
def is_administrator(self):
|
|
return False
|
|
|
|
|
|
login.anonymous_user = AnonymousUser
|
|
|
|
|
|
class Role(db.Model):
|
|
"""Roles for ScoDoc"""
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(64), unique=True)
|
|
default = db.Column(db.Boolean, default=False, index=True)
|
|
permissions = db.Column(db.BigInteger) # 64 bits
|
|
users = db.relationship("User", secondary="user_role", viewonly=True)
|
|
# __table_args__ = (db.UniqueConstraint("name", "dept", name="_rolename_dept_uc"),)
|
|
|
|
def __init__(self, **kwargs):
|
|
super(Role, self).__init__(**kwargs)
|
|
if self.permissions is None:
|
|
self.permissions = 0
|
|
|
|
def __repr__(self):
|
|
return "<Role {} perm={:0{w}b}>".format(
|
|
self.name,
|
|
self.permissions & ((1 << Permission.NBITS) - 1),
|
|
w=Permission.NBITS,
|
|
)
|
|
|
|
def add_permission(self, perm):
|
|
self.permissions |= perm
|
|
|
|
def remove_permission(self, perm):
|
|
self.permissions = self.permissions & ~perm
|
|
|
|
def reset_permissions(self):
|
|
self.permissions = 0
|
|
|
|
def has_permission(self, perm):
|
|
return self.permissions & perm == perm
|
|
|
|
@staticmethod
|
|
def insert_roles():
|
|
"""Create default roles"""
|
|
default_role = "Observateur"
|
|
for r, permissions in SCO_ROLES_DEFAULTS.items():
|
|
role = Role.query.filter_by(name=r).first()
|
|
if role is None:
|
|
role = Role(name=r)
|
|
role.reset_permissions()
|
|
for perm in permissions:
|
|
role.add_permission(perm)
|
|
role.default = role.name == default_role
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
|
|
@staticmethod
|
|
def get_named_role(name):
|
|
"""Returns existing role with given name, or None."""
|
|
return Role.query.filter_by(name=name).first()
|
|
|
|
|
|
class UserRole(db.Model):
|
|
"""Associate user to role, in a dept.
|
|
If dept is None, the role applies to all departments (eg super admin).
|
|
"""
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
|
|
role_id = db.Column(db.Integer, db.ForeignKey("role.id"))
|
|
dept = db.Column(db.String(64))
|
|
user = db.relationship(
|
|
User, backref=db.backref("user_roles", cascade="all, delete-orphan")
|
|
)
|
|
role = db.relationship(
|
|
Role, backref=db.backref("user_roles", cascade="all, delete-orphan")
|
|
)
|
|
|
|
def __repr__(self):
|
|
return "<UserRole u={} r={} dept={}>".format(self.user, self.role, self.dept)
|
|
|
|
|
|
@login.user_loader
|
|
def load_user(id):
|
|
return User.query.get(int(id))
|