ScoDoc-Lille/app/decorators.py
2021-07-03 16:19:42 +02:00

245 lines
8.9 KiB
Python

# -*- coding: UTF-8 -*
"""Decorators for permissions, roles and ScoDoc7 Zope compatibility
"""
import functools
from functools import wraps
import inspect
import types
import logging
import werkzeug
from werkzeug.exceptions import BadRequest
import flask
from flask import g
from flask import abort, current_app
from flask import request
from flask_login import current_user
from flask_login import login_required
from flask import current_app
import app
class ZUser(object):
"Emulating Zope User"
def __init__(self):
"create, based on `flask_login.current_user`"
self.username = current_user.user_name
def __str__(self):
return self.username
def has_permission(self, perm, dept=None):
"""check if this user as the permission `perm`
in departement given by `g.scodoc_dept`.
"""
raise NotImplementedError()
class ZRequest(object):
"Emulating Zope 2 REQUEST"
def __init__(self):
self.URL = request.base_url.encode(
"utf-8"
) # necessaire pour ScoDoc 8 en Python 2 #sco8
self.URL0 = self.URL
self.BASE0 = request.url_root.encode("utf-8")
self.QUERY_STRING = request.query_string.encode("utf-8")
self.REQUEST_METHOD = request.method.encode("utf-8")
self.AUTHENTICATED_USER = current_user
self.REMOTE_ADDR = request.remote_addr
if request.method == "POST":
# self.form = request.form # xxx encode en utf-8 !
# Encode en utf-8 pour ScoDoc8 #sco8
self.form = {k: v.encode("utf-8") for (k, v) in request.form.items()}
if request.files:
# Add files in form: must copy to get a mutable version
# request.form is a werkzeug.datastructures.ImmutableMultiDict
# self.form = self.form.copy()
self.form.update(request.files)
# self.cf = request.form.copy()
for k in request.form:
if k.endswith(":list"):
self.form[k[:-5]] = request.form.getlist(k)
elif request.method == "GET":
# Encode en utf-8 pour ScoDoc8 #sco8
self.form = {k: v.encode("utf-8") for (k, v) in request.args.items()}
self.RESPONSE = ZResponse()
def __str__(self):
return """REQUEST
URL={r.URL}
QUERY_STRING={r.QUERY_STRING}
REQUEST_METHOD={r.REQUEST_METHOD}
AUTHENTICATED_USER={r.AUTHENTICATED_USER}
form={r.form}
""".format(
r=self
)
class ZResponse(object):
"Emulating Zope 2 RESPONSE"
def __init__(self):
self.headers = {}
def redirect(self, url):
current_app.logger.debug("ZResponse redirect to:" + str(url))
return flask.redirect(url.decode("utf-8")) # http 302 # #sco8 unicode
def setHeader(self, header, value):
self.headers[header.lower()] = value
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if "scodoc_dept" in kwargs:
g.scodoc_dept = kwargs["scodoc_dept"].encode("utf-8") # sco8
del kwargs["scodoc_dept"]
# current_app.logger.info(
# "permission_required: %s in %s" % (permission, g.scodoc_dept)
# )
if not current_user.has_permission(permission, g.scodoc_dept):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
from app.auth.models import Permission
return permission_required(Permission.ScoSuperAdmin)(f)
def scodoc7func(context):
"""Décorateur pour intégrer les fonctions Zope 2 de ScoDoc 7.
Si on a un kwarg `scodoc_dept`(venant de la route), le stocke dans `g.scodoc_dept`.
Ajoute l'argument REQUEST s'il est dans la signature de la fonction.
Les paramètres de la query string deviennent des (keywords) paramètres de la fonction.
"""
def s7_decorator(func):
@wraps(func)
def scodoc7func_decorator(*args, **kwargs):
"""Decorator allowing legacy Zope published methods to be called via Flask
routes without modification.
There are two cases: the function can be called
1. via a Flask route ("top level call")
2. or be called directly from Python.
If called via a route, this decorator setups a REQUEST object (emulating Zope2 REQUEST)
and `g.scodoc_dept` if present in the argument (for routes like `/<scodoc_dept>/Scolarite/sco_exemple`).
"""
# Détermine si on est appelé via une route ("toplevel")
# ou par un appel de fonction python normal.
top_level = not hasattr(g, "zrequest")
if not top_level:
# ne "redécore" pas
return func(*args, **kwargs)
#
if "scodoc_dept" in kwargs:
g.scodoc_dept = kwargs["scodoc_dept"].encode("utf-8") # sco8
del kwargs["scodoc_dept"]
elif not hasattr(g, "scodoc_dept"):
g.scodoc_dept = None
# --- Open DB connection
app.open_dept_db_connection()
# --- Emulate Zope's REQUEST
REQUEST = ZRequest()
g.zrequest = REQUEST
req_args = REQUEST.form # args from query string (get) or form (post)
# --- Add positional arguments
pos_arg_values = []
# PY3 à remplacer par inspect.getfullargspec en py3:
argspec = inspect.getargspec(func)
current_app.logger.info("argspec=%s" % str(argspec))
nb_default_args = len(argspec.defaults) if argspec.defaults else 0
if nb_default_args:
arg_names = argspec.args[:-nb_default_args]
else:
arg_names = argspec.args
for arg_name in arg_names:
if arg_name == "REQUEST": # special case
pos_arg_values.append(REQUEST)
elif arg_name == "context":
pos_arg_values.append(context)
else:
# XXX Convert to regular string for ScoDoc8/Python 2
if type(req_args[arg_name]) == types.UnicodeType:
pos_arg_values.append(req_args[arg_name].encode("utf-8"))
else:
pos_arg_values.append(req_args[arg_name])
current_app.logger.info("pos_arg_values=%s" % pos_arg_values)
# Add keyword arguments
if nb_default_args:
for arg_name in argspec.args[-nb_default_args:]:
if arg_name == "REQUEST": # special case
kwargs[arg_name] = REQUEST
elif arg_name in req_args:
# set argument kw optionnel
# XXX Convert to regular string for ScoDoc8/Python 2
if type(req_args[arg_name]) == types.UnicodeType:
kwargs[arg_name] = req_args[arg_name].encode("utf-8")
else:
kwargs[arg_name] = req_args[arg_name]
current_app.logger.info(
"scodoc7func_decorator: top_level=%s, pos_arg_values=%s, kwargs=%s"
% (top_level, pos_arg_values, kwargs)
)
value = func(*pos_arg_values, **kwargs)
if not top_level:
return value
else:
if isinstance(value, werkzeug.wrappers.response.Response):
return value # redirected
# Build response, adding collected http headers:
headers = []
kw = {"response": value, "status": 200}
if g.zrequest:
headers = g.zrequest.RESPONSE.headers
if not headers:
# no customized header, speedup:
return value
if "content-type" in headers:
kw["mimetype"] = headers["content-type"]
r = flask.Response(**kw)
for h in headers:
r.headers[h] = headers[h]
return r
return scodoc7func_decorator
return s7_decorator
# Le "context" de ScoDoc7
class ScoDoc7Context(object):
"""Context object for legacy Zope methods.
Mainly used to call published methods, as context.function(...)
"""
def __init__(self, name=""):
self.name = name
logging.getLogger(__name__).info("created %s" % self)
def populate(self, globals_dict):
logging.getLogger(__name__).info("populating context %s" % self)
for k in globals_dict:
if (not k.startswith("_")) and (
type(globals_dict[k]) == types.FunctionType
):
setattr(self, k, globals_dict[k].__get__(self))
def __repr__(self):
return "ScoDoc7Context('%s')" % self.name