288 lines
9.8 KiB
Python
288 lines
9.8 KiB
Python
# -*- coding: UTF-8 -*
|
|
"""Decorators for permissions, roles and ScoDoc7 Zope compatibility
|
|
"""
|
|
import functools
|
|
from functools import wraps
|
|
import inspect
|
|
import types
|
|
import logging
|
|
|
|
import werkzeug
|
|
from werkzeug.exceptions import BadRequest
|
|
import flask
|
|
from flask import g
|
|
from flask import abort, current_app
|
|
from flask import request
|
|
from flask_login import current_user
|
|
from flask_login import login_required
|
|
from flask import current_app
|
|
import flask_login
|
|
|
|
import app
|
|
from app.auth.models import User
|
|
|
|
|
|
class ZUser(object):
|
|
"Emulating Zope User"
|
|
|
|
def __init__(self):
|
|
"create, based on `flask_login.current_user`"
|
|
self.username = current_user.user_name
|
|
|
|
def __str__(self):
|
|
return self.username
|
|
|
|
def has_permission(self, perm, dept=None):
|
|
"""check if this user as the permission `perm`
|
|
in departement given by `g.scodoc_dept`.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class ZRequest(object):
|
|
"Emulating Zope 2 REQUEST"
|
|
|
|
def __init__(self):
|
|
# if current_app.config["DEBUG"]:
|
|
|
|
# le ReverseProxied se charge maintenant de mettre le bon protocole http ou https
|
|
self.URL = request.base_url
|
|
self.BASE0 = request.url_root
|
|
# else:
|
|
# self.URL = request.base_url.replace("http://", "https://")
|
|
# self.BASE0 = request.url_root.replace("http://", "https://")
|
|
self.URL0 = self.URL
|
|
# query_string is bytes:
|
|
self.QUERY_STRING = request.query_string.decode("utf-8")
|
|
self.REQUEST_METHOD = request.method
|
|
self.AUTHENTICATED_USER = current_user
|
|
self.REMOTE_ADDR = request.remote_addr
|
|
if request.method == "POST":
|
|
# request.form is a werkzeug.datastructures.ImmutableMultiDict
|
|
# must copy to get a mutable version (needed by TrivialFormulator)
|
|
self.form = request.form.copy()
|
|
if request.files:
|
|
# Add files in form:
|
|
self.form.update(request.files)
|
|
for k in request.form:
|
|
if k.endswith(":list"):
|
|
self.form[k[:-5]] = request.form.getlist(k)
|
|
elif request.method == "GET":
|
|
self.form = {}
|
|
for k in request.args:
|
|
# current_app.logger.debug("%s\t%s" % (k, request.args.getlist(k)))
|
|
if k.endswith(":list"):
|
|
self.form[k[:-5]] = request.args.getlist(k)
|
|
else:
|
|
self.form[k] = request.args[k]
|
|
# current_app.logger.info("ZRequest.form=%s" % str(self.form))
|
|
self.RESPONSE = ZResponse()
|
|
|
|
def __str__(self):
|
|
return """REQUEST
|
|
URL={r.URL}
|
|
QUERY_STRING={r.QUERY_STRING}
|
|
REQUEST_METHOD={r.REQUEST_METHOD}
|
|
AUTHENTICATED_USER={r.AUTHENTICATED_USER}
|
|
form={r.form}
|
|
""".format(
|
|
r=self
|
|
)
|
|
|
|
|
|
class ZResponse(object):
|
|
"Emulating Zope 2 RESPONSE"
|
|
|
|
def __init__(self):
|
|
self.headers = {}
|
|
|
|
def redirect(self, url):
|
|
# current_app.logger.debug("ZResponse redirect to:" + str(url))
|
|
return flask.redirect(url) # http 302
|
|
|
|
def setHeader(self, header, value):
|
|
self.headers[header.lower()] = value
|
|
|
|
|
|
def scodoc(func):
|
|
"""Décorateur pour toutes les fonctions ScoDoc
|
|
Affecte le département à g
|
|
et ouvre la connexion à la base
|
|
|
|
Set `g.scodoc_dept` and `g.scodoc_dept_id` if `scodoc_dept` is present
|
|
in the argument (for routes like
|
|
`/<scodoc_dept>/Scolarite/sco_exemple`).
|
|
"""
|
|
|
|
@wraps(func)
|
|
def scodoc_function(*args, **kwargs):
|
|
if "scodoc_dept" in kwargs:
|
|
dept_acronym = kwargs["scodoc_dept"]
|
|
# current_app.logger.info("setting dept to " + dept_acronym)
|
|
app.set_sco_dept(dept_acronym)
|
|
del kwargs["scodoc_dept"]
|
|
elif not hasattr(g, "scodoc_dept"):
|
|
# current_app.logger.info("setting dept to None")
|
|
g.scodoc_dept = None
|
|
g.scodoc_dept_id = -1 # invalide
|
|
return func(*args, **kwargs)
|
|
|
|
return scodoc_function
|
|
|
|
|
|
def permission_required(permission):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# current_app.logger.info("PERMISSION; kwargs=%s" % str(kwargs))
|
|
scodoc_dept = getattr(g, "scodoc_dept", None)
|
|
if not current_user.has_permission(permission, scodoc_dept):
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
|
|
return login_required(decorated_function)
|
|
|
|
return decorator
|
|
|
|
|
|
def permission_required_compat_scodoc7(permission):
|
|
"""Décorateur pour les fonctions utilisée comme API dans ScoDoc 7
|
|
Comme @permission_required mais autorise de passer directement
|
|
les informations d'auth en paramètres:
|
|
__ac_name, __ac_password
|
|
"""
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# current_app.logger.warning("PERMISSION; kwargs=%s" % str(kwargs))
|
|
# cherche les paramètre d'auth:
|
|
auth_ok = False
|
|
if request.method == "GET":
|
|
user_name = request.args.get("__ac_name")
|
|
user_password = request.args.get("__ac_password")
|
|
elif request.method == "POST":
|
|
user_name = request.form.get("__ac_name")
|
|
user_password = request.form.get("__ac_password")
|
|
else:
|
|
abort(405) # method not allowed
|
|
if user_name and user_password:
|
|
u = User.query.filter_by(user_name=user_name).first()
|
|
if u and u.check_password(user_password):
|
|
auth_ok = True
|
|
flask_login.login_user(u)
|
|
|
|
# reprend le chemin classique:
|
|
scodoc_dept = getattr(g, "scodoc_dept", None)
|
|
|
|
if not current_user.has_permission(permission, scodoc_dept):
|
|
abort(403)
|
|
if auth_ok:
|
|
return f(*args, **kwargs)
|
|
else:
|
|
return login_required(f)(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|
|
|
|
|
|
def admin_required(f):
|
|
from app.auth.models import Permission
|
|
|
|
return permission_required(Permission.ScoSuperAdmin)(f)
|
|
|
|
|
|
def scodoc7func(func):
|
|
"""Décorateur pour intégrer les fonctions Zope 2 de ScoDoc 7.
|
|
Ajoute l'argument REQUEST s'il est dans la signature de la fonction.
|
|
Les paramètres de la query string deviennent des (keywords) paramètres de la fonction.
|
|
"""
|
|
|
|
@wraps(func)
|
|
def scodoc7func_decorator(*args, **kwargs):
|
|
"""Decorator allowing legacy Zope published methods to be called via Flask
|
|
routes without modification.
|
|
|
|
There are two cases: the function can be called
|
|
1. via a Flask route ("top level call")
|
|
2. or be called directly from Python.
|
|
|
|
If called via a route, this decorator setups a REQUEST object (emulating Zope2 REQUEST)
|
|
"""
|
|
# Détermine si on est appelé via une route ("toplevel")
|
|
# ou par un appel de fonction python normal.
|
|
top_level = not hasattr(g, "zrequest")
|
|
if not top_level:
|
|
# ne "redécore" pas
|
|
return func(*args, **kwargs)
|
|
# --- Emulate Zope's REQUEST
|
|
REQUEST = ZRequest()
|
|
g.zrequest = REQUEST
|
|
req_args = REQUEST.form # args from query string (get) or form (post)
|
|
# --- Add positional arguments
|
|
pos_arg_values = []
|
|
argspec = inspect.getfullargspec(func)
|
|
# current_app.logger.info("argspec=%s" % str(argspec))
|
|
nb_default_args = len(argspec.defaults) if argspec.defaults else 0
|
|
if nb_default_args:
|
|
arg_names = argspec.args[:-nb_default_args]
|
|
else:
|
|
arg_names = argspec.args
|
|
for arg_name in arg_names:
|
|
if arg_name == "REQUEST": # special case
|
|
pos_arg_values.append(REQUEST)
|
|
else:
|
|
v = req_args[arg_name]
|
|
# try to convert all arguments to INTEGERS
|
|
# necessary for db ids and boolean values
|
|
try:
|
|
v = int(v)
|
|
except ValueError:
|
|
pass
|
|
pos_arg_values.append(v)
|
|
# current_app.logger.info("pos_arg_values=%s" % pos_arg_values)
|
|
# current_app.logger.info("req_args=%s" % req_args)
|
|
# Add keyword arguments
|
|
if nb_default_args:
|
|
for arg_name in argspec.args[-nb_default_args:]:
|
|
if arg_name == "REQUEST": # special case
|
|
kwargs[arg_name] = REQUEST
|
|
elif arg_name in req_args:
|
|
# set argument kw optionnel
|
|
v = req_args[arg_name]
|
|
# try to convert all arguments to INTEGERS
|
|
# necessary for db ids and boolean values
|
|
try:
|
|
v = int(v)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
kwargs[arg_name] = v
|
|
# current_app.logger.info(
|
|
# "scodoc7func_decorator: top_level=%s, pos_arg_values=%s, kwargs=%s"
|
|
# % (top_level, pos_arg_values, kwargs)
|
|
# )
|
|
value = func(*pos_arg_values, **kwargs)
|
|
|
|
if not top_level:
|
|
return value
|
|
else:
|
|
if isinstance(value, werkzeug.wrappers.response.Response):
|
|
return value # redirected
|
|
# Build response, adding collected http headers:
|
|
headers = []
|
|
kw = {"response": value, "status": 200}
|
|
if g.zrequest:
|
|
headers = g.zrequest.RESPONSE.headers
|
|
if not headers:
|
|
# no customized header, speedup:
|
|
return value
|
|
if "content-type" in headers:
|
|
kw["mimetype"] = headers["content-type"]
|
|
r = flask.Response(**kw)
|
|
for h in headers:
|
|
r.headers[h] = headers[h]
|
|
return r
|
|
|
|
return scodoc7func_decorator
|