ScoDoc/app/decorators.py

288 lines
9.8 KiB
Python
Raw Normal View History

2021-05-29 18:22:51 +02:00
# -*- coding: UTF-8 -*
"""Decorators for permissions, roles and ScoDoc7 Zope compatibility
"""
import functools
from functools import wraps
import inspect
2021-06-16 12:02:43 +02:00
import types
import logging
2021-05-29 18:22:51 +02:00
import werkzeug
from werkzeug.exceptions import BadRequest
2021-05-29 18:22:51 +02:00
import flask
from flask import g
from flask import abort, current_app
from flask import request
from flask_login import current_user
from flask_login import login_required
from flask import current_app
import flask_login
import app
from app.auth.models import User
2021-05-29 18:22:51 +02:00
class ZUser(object):
"Emulating Zope User"
def __init__(self):
"create, based on `flask_login.current_user`"
self.username = current_user.user_name
2021-05-29 18:22:51 +02:00
def __str__(self):
return self.username
2021-06-24 10:59:03 +02:00
def has_permission(self, perm, dept=None):
2021-05-29 18:22:51 +02:00
"""check if this user as the permission `perm`
in departement given by `g.scodoc_dept`.
"""
raise NotImplementedError()
class ZRequest(object):
"Emulating Zope 2 REQUEST"
def __init__(self):
# if current_app.config["DEBUG"]:
# le ReverseProxied se charge maintenant de mettre le bon protocole http ou https
self.URL = request.base_url
self.BASE0 = request.url_root
# else:
# self.URL = request.base_url.replace("http://", "https://")
# self.BASE0 = request.url_root.replace("http://", "https://")
2021-05-29 18:22:51 +02:00
self.URL0 = self.URL
# query_string is bytes:
self.QUERY_STRING = request.query_string.decode("utf-8")
2021-07-12 15:13:10 +02:00
self.REQUEST_METHOD = request.method
2021-05-29 18:22:51 +02:00
self.AUTHENTICATED_USER = current_user
2021-06-21 23:13:57 +02:00
self.REMOTE_ADDR = request.remote_addr
2021-05-29 18:22:51 +02:00
if request.method == "POST":
2021-07-12 15:13:10 +02:00
# request.form is a werkzeug.datastructures.ImmutableMultiDict
# must copy to get a mutable version (needed by TrivialFormulator)
self.form = request.form.copy()
2021-05-29 18:22:51 +02:00
if request.files:
2021-07-12 15:13:10 +02:00
# Add files in form:
2021-05-29 18:22:51 +02:00
self.form.update(request.files)
2021-07-03 16:19:42 +02:00
for k in request.form:
if k.endswith(":list"):
self.form[k[:-5]] = request.form.getlist(k)
2021-05-29 18:22:51 +02:00
elif request.method == "GET":
2021-07-12 22:38:30 +02:00
self.form = {}
for k in request.args:
# current_app.logger.debug("%s\t%s" % (k, request.args.getlist(k)))
2021-07-12 22:38:30 +02:00
if k.endswith(":list"):
self.form[k[:-5]] = request.args.getlist(k)
else:
self.form[k] = request.args[k]
# current_app.logger.info("ZRequest.form=%s" % str(self.form))
2021-05-29 18:22:51 +02:00
self.RESPONSE = ZResponse()
def __str__(self):
return """REQUEST
URL={r.URL}
QUERY_STRING={r.QUERY_STRING}
REQUEST_METHOD={r.REQUEST_METHOD}
AUTHENTICATED_USER={r.AUTHENTICATED_USER}
form={r.form}
""".format(
r=self
)
class ZResponse(object):
"Emulating Zope 2 RESPONSE"
def __init__(self):
self.headers = {}
def redirect(self, url):
# current_app.logger.debug("ZResponse redirect to:" + str(url))
2021-07-12 15:13:10 +02:00
return flask.redirect(url) # http 302
2021-05-29 18:22:51 +02:00
def setHeader(self, header, value):
2021-06-12 22:43:22 +02:00
self.headers[header.lower()] = value
2021-05-29 18:22:51 +02:00
2021-08-13 00:34:58 +02:00
def scodoc(func):
"""Décorateur pour toutes les fonctions ScoDoc
Affecte le département à g
et ouvre la connexion à la base
Set `g.scodoc_dept` and `g.scodoc_dept_id` if `scodoc_dept` is present
in the argument (for routes like
`/<scodoc_dept>/Scolarite/sco_exemple`).
"""
@wraps(func)
def scodoc_function(*args, **kwargs):
if "scodoc_dept" in kwargs:
dept_acronym = kwargs["scodoc_dept"]
2021-08-19 10:28:35 +02:00
# current_app.logger.info("setting dept to " + dept_acronym)
2021-08-13 00:34:58 +02:00
app.set_sco_dept(dept_acronym)
del kwargs["scodoc_dept"]
elif not hasattr(g, "scodoc_dept"):
2021-08-19 10:28:35 +02:00
# current_app.logger.info("setting dept to None")
2021-08-13 00:34:58 +02:00
g.scodoc_dept = None
g.scodoc_dept_id = -1 # invalide
return func(*args, **kwargs)
return scodoc_function
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
2021-08-13 00:34:58 +02:00
# current_app.logger.info("PERMISSION; kwargs=%s" % str(kwargs))
scodoc_dept = getattr(g, "scodoc_dept", None)
if not current_user.has_permission(permission, scodoc_dept):
abort(403)
return f(*args, **kwargs)
return login_required(decorated_function)
return decorator
def permission_required_compat_scodoc7(permission):
"""Décorateur pour les fonctions utilisée comme API dans ScoDoc 7
Comme @permission_required mais autorise de passer directement
les informations d'auth en paramètres:
__ac_name, __ac_password
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# current_app.logger.warning("PERMISSION; kwargs=%s" % str(kwargs))
# cherche les paramètre d'auth:
auth_ok = False
if request.method == "GET":
user_name = request.args.get("__ac_name")
user_password = request.args.get("__ac_password")
elif request.method == "POST":
user_name = request.form.get("__ac_name")
user_password = request.form.get("__ac_password")
else:
abort(405) # method not allowed
if user_name and user_password:
u = User.query.filter_by(user_name=user_name).first()
if u and u.check_password(user_password):
auth_ok = True
flask_login.login_user(u)
# reprend le chemin classique:
scodoc_dept = getattr(g, "scodoc_dept", None)
if not current_user.has_permission(permission, scodoc_dept):
abort(403)
if auth_ok:
return f(*args, **kwargs)
else:
return login_required(f)(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
2021-06-28 10:45:00 +02:00
from app.auth.models import Permission
return permission_required(Permission.ScoSuperAdmin)(f)
def scodoc7func(func):
2021-05-29 18:22:51 +02:00
"""Décorateur pour intégrer les fonctions Zope 2 de ScoDoc 7.
Ajoute l'argument REQUEST s'il est dans la signature de la fonction.
Les paramètres de la query string deviennent des (keywords) paramètres de la fonction.
"""
@wraps(func)
def scodoc7func_decorator(*args, **kwargs):
"""Decorator allowing legacy Zope published methods to be called via Flask
routes without modification.
There are two cases: the function can be called
1. via a Flask route ("top level call")
2. or be called directly from Python.
If called via a route, this decorator setups a REQUEST object (emulating Zope2 REQUEST)
"""
# Détermine si on est appelé via une route ("toplevel")
# ou par un appel de fonction python normal.
top_level = not hasattr(g, "zrequest")
if not top_level:
# ne "redécore" pas
return func(*args, **kwargs)
# --- Emulate Zope's REQUEST
REQUEST = ZRequest()
g.zrequest = REQUEST
req_args = REQUEST.form # args from query string (get) or form (post)
# --- Add positional arguments
pos_arg_values = []
argspec = inspect.getfullargspec(func)
# current_app.logger.info("argspec=%s" % str(argspec))
nb_default_args = len(argspec.defaults) if argspec.defaults else 0
if nb_default_args:
arg_names = argspec.args[:-nb_default_args]
else:
arg_names = argspec.args
for arg_name in arg_names:
if arg_name == "REQUEST": # special case
pos_arg_values.append(REQUEST)
2021-05-29 18:22:51 +02:00
else:
v = req_args[arg_name]
# try to convert all arguments to INTEGERS
# necessary for db ids and boolean values
try:
v = int(v)
except ValueError:
pass
pos_arg_values.append(v)
# current_app.logger.info("pos_arg_values=%s" % pos_arg_values)
# current_app.logger.info("req_args=%s" % req_args)
# Add keyword arguments
if nb_default_args:
for arg_name in argspec.args[-nb_default_args:]:
2021-05-29 18:22:51 +02:00
if arg_name == "REQUEST": # special case
kwargs[arg_name] = REQUEST
elif arg_name in req_args:
# set argument kw optionnel
2021-08-09 10:08:24 +02:00
v = req_args[arg_name]
# try to convert all arguments to INTEGERS
# necessary for db ids and boolean values
try:
v = int(v)
except (ValueError, TypeError):
2021-08-09 10:08:24 +02:00
pass
kwargs[arg_name] = v
# current_app.logger.info(
# "scodoc7func_decorator: top_level=%s, pos_arg_values=%s, kwargs=%s"
# % (top_level, pos_arg_values, kwargs)
# )
value = func(*pos_arg_values, **kwargs)
if not top_level:
return value
else:
if isinstance(value, werkzeug.wrappers.response.Response):
return value # redirected
# Build response, adding collected http headers:
headers = []
kw = {"response": value, "status": 200}
if g.zrequest:
headers = g.zrequest.RESPONSE.headers
if not headers:
# no customized header, speedup:
return value
if "content-type" in headers:
kw["mimetype"] = headers["content-type"]
r = flask.Response(**kw)
for h in headers:
r.headers[h] = headers[h]
return r
return scodoc7func_decorator