242 lines
8.4 KiB
Python
242 lines
8.4 KiB
Python
# -*- coding: UTF-8 -*
|
|
"""Decorators for permissions, roles and ScoDoc7 Zope compatibility
|
|
"""
|
|
import functools
|
|
from functools import wraps
|
|
import inspect
|
|
import types
|
|
import logging
|
|
|
|
import werkzeug
|
|
from werkzeug.exceptions import BadRequest
|
|
import flask
|
|
from flask import g
|
|
from flask import abort, current_app
|
|
from flask import request
|
|
from flask_login import current_user
|
|
from flask_login import login_required
|
|
from flask import current_app
|
|
|
|
import app
|
|
|
|
|
|
class ZUser(object):
|
|
"Emulating Zope User"
|
|
|
|
def __init__(self):
|
|
"create, based on `flask_login.current_user`"
|
|
self.username = current_user.user_name
|
|
|
|
def __str__(self):
|
|
return self.username
|
|
|
|
def has_permission(self, perm, dept=None):
|
|
"""check if this user as the permission `perm`
|
|
in departement given by `g.scodoc_dept`.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class ZRequest(object):
|
|
"Emulating Zope 2 REQUEST"
|
|
|
|
def __init__(self):
|
|
self.URL = request.base_url
|
|
|
|
self.URL0 = self.URL
|
|
self.BASE0 = request.url_root
|
|
self.QUERY_STRING = request.query_string.decode(
|
|
"utf-8"
|
|
) # query_string is bytes
|
|
self.REQUEST_METHOD = request.method
|
|
self.AUTHENTICATED_USER = current_user
|
|
self.REMOTE_ADDR = request.remote_addr
|
|
if request.method == "POST":
|
|
# request.form is a werkzeug.datastructures.ImmutableMultiDict
|
|
# must copy to get a mutable version (needed by TrivialFormulator)
|
|
self.form = request.form.copy()
|
|
if request.files:
|
|
# Add files in form:
|
|
self.form.update(request.files)
|
|
for k in request.form:
|
|
if k.endswith(":list"):
|
|
self.form[k[:-5]] = request.form.getlist(k)
|
|
elif request.method == "GET":
|
|
self.form = {}
|
|
for k in request.args:
|
|
current_app.logger.debug("%s\t%s" % (k, request.args.getlist(k)))
|
|
if k.endswith(":list"):
|
|
self.form[k[:-5]] = request.args.getlist(k)
|
|
else:
|
|
self.form[k] = request.args[k]
|
|
current_app.logger.info("ZRequest.form=%s" % str(self.form))
|
|
self.RESPONSE = ZResponse()
|
|
|
|
def __str__(self):
|
|
return """REQUEST
|
|
URL={r.URL}
|
|
QUERY_STRING={r.QUERY_STRING}
|
|
REQUEST_METHOD={r.REQUEST_METHOD}
|
|
AUTHENTICATED_USER={r.AUTHENTICATED_USER}
|
|
form={r.form}
|
|
""".format(
|
|
r=self
|
|
)
|
|
|
|
|
|
class ZResponse(object):
|
|
"Emulating Zope 2 RESPONSE"
|
|
|
|
def __init__(self):
|
|
self.headers = {}
|
|
|
|
def redirect(self, url):
|
|
current_app.logger.debug("ZResponse redirect to:" + str(url))
|
|
return flask.redirect(url) # http 302
|
|
|
|
def setHeader(self, header, value):
|
|
self.headers[header.lower()] = value
|
|
|
|
|
|
def permission_required(permission):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if "scodoc_dept" in kwargs:
|
|
g.scodoc_dept = kwargs["scodoc_dept"]
|
|
del kwargs["scodoc_dept"]
|
|
# current_app.logger.info(
|
|
# "permission_required: %s in %s" % (permission, g.scodoc_dept)
|
|
# )
|
|
if not current_user.has_permission(permission, g.scodoc_dept):
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|
|
|
|
|
|
def admin_required(f):
|
|
from app.auth.models import Permission
|
|
|
|
return permission_required(Permission.ScoSuperAdmin)(f)
|
|
|
|
|
|
def scodoc7func(context):
|
|
"""Décorateur pour intégrer les fonctions Zope 2 de ScoDoc 7.
|
|
Si on a un kwarg `scodoc_dept`(venant de la route), le stocke dans `g.scodoc_dept`.
|
|
Ajoute l'argument REQUEST s'il est dans la signature de la fonction.
|
|
Les paramètres de la query string deviennent des (keywords) paramètres de la fonction.
|
|
"""
|
|
|
|
def s7_decorator(func):
|
|
@wraps(func)
|
|
def scodoc7func_decorator(*args, **kwargs):
|
|
"""Decorator allowing legacy Zope published methods to be called via Flask
|
|
routes without modification.
|
|
|
|
There are two cases: the function can be called
|
|
1. via a Flask route ("top level call")
|
|
2. or be called directly from Python.
|
|
|
|
If called via a route, this decorator setups a REQUEST object (emulating Zope2 REQUEST)
|
|
and `g.scodoc_dept` if present in the argument (for routes like `/<scodoc_dept>/Scolarite/sco_exemple`).
|
|
"""
|
|
# Détermine si on est appelé via une route ("toplevel")
|
|
# ou par un appel de fonction python normal.
|
|
top_level = not hasattr(g, "zrequest")
|
|
if not top_level:
|
|
# ne "redécore" pas
|
|
return func(*args, **kwargs)
|
|
#
|
|
if "scodoc_dept" in kwargs:
|
|
g.scodoc_dept = kwargs["scodoc_dept"]
|
|
del kwargs["scodoc_dept"]
|
|
elif not hasattr(g, "scodoc_dept"):
|
|
g.scodoc_dept = None
|
|
# --- Open DB connection
|
|
app.open_dept_db_connection()
|
|
# --- Emulate Zope's REQUEST
|
|
REQUEST = ZRequest()
|
|
g.zrequest = REQUEST
|
|
req_args = REQUEST.form # args from query string (get) or form (post)
|
|
# --- Add positional arguments
|
|
pos_arg_values = []
|
|
# PY3 à remplacer par inspect.getfullargspec en py3: TODO
|
|
argspec = inspect.getargspec(func)
|
|
current_app.logger.info("argspec=%s" % str(argspec))
|
|
nb_default_args = len(argspec.defaults) if argspec.defaults else 0
|
|
if nb_default_args:
|
|
arg_names = argspec.args[:-nb_default_args]
|
|
else:
|
|
arg_names = argspec.args
|
|
for arg_name in arg_names:
|
|
if arg_name == "REQUEST": # special case
|
|
pos_arg_values.append(REQUEST)
|
|
elif arg_name == "context":
|
|
pos_arg_values.append(context)
|
|
else:
|
|
pos_arg_values.append(req_args[arg_name])
|
|
current_app.logger.info("pos_arg_values=%s" % pos_arg_values)
|
|
current_app.logger.info("req_args=%s" % req_args)
|
|
# Add keyword arguments
|
|
if nb_default_args:
|
|
for arg_name in argspec.args[-nb_default_args:]:
|
|
if arg_name == "REQUEST": # special case
|
|
kwargs[arg_name] = REQUEST
|
|
elif arg_name in req_args:
|
|
# set argument kw optionnel
|
|
kwargs[arg_name] = req_args[arg_name]
|
|
current_app.logger.info(
|
|
"scodoc7func_decorator: top_level=%s, pos_arg_values=%s, kwargs=%s"
|
|
% (top_level, pos_arg_values, kwargs)
|
|
)
|
|
value = func(*pos_arg_values, **kwargs)
|
|
|
|
if not top_level:
|
|
return value
|
|
else:
|
|
if isinstance(value, werkzeug.wrappers.response.Response):
|
|
return value # redirected
|
|
# Build response, adding collected http headers:
|
|
headers = []
|
|
kw = {"response": value, "status": 200}
|
|
if g.zrequest:
|
|
headers = g.zrequest.RESPONSE.headers
|
|
if not headers:
|
|
# no customized header, speedup:
|
|
return value
|
|
if "content-type" in headers:
|
|
kw["mimetype"] = headers["content-type"]
|
|
r = flask.Response(**kw)
|
|
for h in headers:
|
|
r.headers[h] = headers[h]
|
|
return r
|
|
|
|
return scodoc7func_decorator
|
|
|
|
return s7_decorator
|
|
|
|
|
|
# Le "context" de ScoDoc7
|
|
class ScoDoc7Context(object):
|
|
"""Context object for legacy Zope methods.
|
|
Mainly used to call published methods, as context.function(...)
|
|
"""
|
|
|
|
def __init__(self, name=""):
|
|
self.name = name
|
|
logging.getLogger(__name__).info("created %s" % self)
|
|
|
|
def populate(self, globals_dict):
|
|
logging.getLogger(__name__).info("populating context %s" % self)
|
|
for k in globals_dict:
|
|
if (not k.startswith("_")) and (
|
|
isinstance(globals_dict[k], types.FunctionType)
|
|
):
|
|
setattr(self, k, globals_dict[k].__get__(self))
|
|
|
|
def __repr__(self):
|
|
return "ScoDoc7Context('%s')" % self.name
|