1
0
forked from ScoDoc/ScoDoc
ScoDoc/app/decorators.py

240 lines
8.7 KiB
Python
Raw Normal View History

2021-05-29 18:22:51 +02:00
# -*- coding: UTF-8 -*
"""Decorators for permissions, roles and ScoDoc7 Zope compatibility
"""
import functools
from functools import wraps
import inspect
2021-06-16 12:02:43 +02:00
import types
import logging
2021-05-29 18:22:51 +02:00
import werkzeug
from werkzeug.exceptions import BadRequest
2021-05-29 18:22:51 +02:00
import flask
from flask import g
from flask import abort, current_app
from flask import request
from flask_login import current_user
from flask_login import login_required
from flask import current_app
import app
2021-05-29 18:22:51 +02:00
from app.auth.models import Permission
class ZUser(object):
"Emulating Zope User"
def __init__(self):
"create, based on `flask_login.current_user`"
self.username = current_user.username
def __str__(self):
return self.username
2021-06-24 10:59:03 +02:00
def has_permission(self, perm, dept=None):
2021-05-29 18:22:51 +02:00
"""check if this user as the permission `perm`
in departement given by `g.scodoc_dept`.
"""
raise NotImplementedError()
class ZRequest(object):
"Emulating Zope 2 REQUEST"
def __init__(self):
2021-06-15 15:38:38 +02:00
self.URL = request.base_url.encode(
"utf-8"
2021-06-24 10:59:03 +02:00
) # necessaire pour ScoDoc 8 en Python 2 #sco8
2021-05-29 18:22:51 +02:00
self.URL0 = self.URL
2021-06-15 15:38:38 +02:00
self.BASE0 = request.url_root.encode("utf-8")
self.QUERY_STRING = request.query_string.encode("utf-8")
self.REQUEST_METHOD = request.method.encode("utf-8")
2021-05-29 18:22:51 +02:00
self.AUTHENTICATED_USER = current_user
2021-06-21 23:13:57 +02:00
self.REMOTE_ADDR = request.remote_addr
2021-05-29 18:22:51 +02:00
if request.method == "POST":
self.form = request.form # xxx encode en utf-8 !
# Encode en utf-8 pour ScoDoc8 #sco8
self.form = {k: v.encode("utf-8") for (k, v) in request.form.items()}
2021-05-29 18:22:51 +02:00
if request.files:
# Add files in form: must copy to get a mutable version
# request.form is a werkzeug.datastructures.ImmutableMultiDict
# self.form = self.form.copy()
2021-05-29 18:22:51 +02:00
self.form.update(request.files)
elif request.method == "GET":
# Encode en utf-8 pour ScoDoc8 #sco8
self.form = {k: v.encode("utf-8") for (k, v) in request.args.items()}
2021-05-29 18:22:51 +02:00
self.RESPONSE = ZResponse()
def __str__(self):
return """REQUEST
URL={r.URL}
QUERY_STRING={r.QUERY_STRING}
REQUEST_METHOD={r.REQUEST_METHOD}
AUTHENTICATED_USER={r.AUTHENTICATED_USER}
form={r.form}
""".format(
r=self
)
class ZResponse(object):
"Emulating Zope 2 RESPONSE"
def __init__(self):
self.headers = {}
def redirect(self, url):
current_app.logger.debug("ZResponse redirect to:" + str(url))
return flask.redirect(url.decode("utf-8")) # http 302 # #sco8 unicode
2021-05-29 18:22:51 +02:00
def setHeader(self, header, value):
2021-06-12 22:43:22 +02:00
self.headers[header.lower()] = value
2021-05-29 18:22:51 +02:00
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if "scodoc_dept" in kwargs:
g.scodoc_dept = kwargs["scodoc_dept"]
del kwargs["scodoc_dept"]
2021-06-16 12:02:43 +02:00
# current_app.logger.info(
# "permission_required: %s in %s" % (permission, g.scodoc_dept)
# )
if not current_user.has_permission(permission, g.scodoc_dept):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
return permission_required(Permission.ScoSuperAdmin)(f)
def scodoc7func(context):
2021-05-29 18:22:51 +02:00
"""Décorateur pour intégrer les fonctions Zope 2 de ScoDoc 7.
Si on a un kwarg `scodoc_dept`(venant de la route), le stocke dans `g.scodoc_dept`.
Ajoute l'argument REQUEST s'il est dans la signature de la fonction.
Les paramètres de la query string deviennent des (keywords) paramètres de la fonction.
"""
def s7_decorator(func):
@wraps(func)
def scodoc7func_decorator(*args, **kwargs):
"""Decorator allowing legacy Zope published methods to be called via Flask
routes without modification.
There are two cases: the function can be called
1. via a Flask route ("top level call")
2. or be called directly from Python.
If called via a route, this decorator setups a REQUEST object (emulating Zope2 REQUEST)
and `g.scodoc_dept` if present in the argument (for routes like `/<scodoc_dept>/Scolarite/sco_exemple`).
"""
# Détermine si on est appelé via une route ("toplevel")
# ou par un appel de fonction python normal.
top_level = not hasattr(g, "zrequest")
if not top_level:
# ne "redécore" pas
return func(*args, **kwargs)
#
if "scodoc_dept" in kwargs:
g.scodoc_dept = kwargs["scodoc_dept"]
del kwargs["scodoc_dept"]
elif not hasattr(g, "scodoc_dept"):
g.scodoc_dept = None
# --- Open DB connection
app.open_dept_db_connection()
# --- Emulate Zope's REQUEST
REQUEST = ZRequest()
g.zrequest = REQUEST
req_args = REQUEST.form # args from query string (get) or form (post)
# --- Add positional arguments
pos_arg_values = []
# PY3 à remplacer par inspect.getfullargspec en py3:
argspec = inspect.getargspec(func)
current_app.logger.info("argspec=%s" % str(argspec))
nb_default_args = len(argspec.defaults) if argspec.defaults else 0
if nb_default_args:
arg_names = argspec.args[:-nb_default_args]
2021-05-29 18:22:51 +02:00
else:
arg_names = argspec.args
for arg_name in arg_names:
2021-05-29 18:22:51 +02:00
if arg_name == "REQUEST": # special case
pos_arg_values.append(REQUEST)
elif arg_name == "context":
pos_arg_values.append(context)
else:
# XXX Convert to regular string for ScoDoc8/Python 2
if type(req_args[arg_name]) == types.UnicodeType:
pos_arg_values.append(req_args[arg_name].encode("utf-8"))
else:
pos_arg_values.append(req_args[arg_name])
current_app.logger.info("pos_arg_values=%s" % pos_arg_values)
# Add keyword arguments
if nb_default_args:
for arg_name in argspec.args[-nb_default_args:]:
if arg_name == "REQUEST": # special case
kwargs[arg_name] = REQUEST
elif arg_name in req_args:
# set argument kw optionnel
# XXX Convert to regular string for ScoDoc8/Python 2
if type(req_args[arg_name]) == types.UnicodeType:
kwargs[arg_name] = req_args[arg_name].encode("utf-8")
else:
kwargs[arg_name] = req_args[arg_name]
current_app.logger.info(
"scodoc7func_decorator: top_level=%s, pos_arg_values=%s, kwargs=%s"
% (top_level, pos_arg_values, kwargs)
)
value = func(*pos_arg_values, **kwargs)
if not top_level:
return value
else:
if isinstance(value, werkzeug.wrappers.response.Response):
return value # redirected
# Build response, adding collected http headers:
headers = []
kw = {"response": value, "status": 200}
if g.zrequest:
headers = g.zrequest.RESPONSE.headers
if not headers:
# no customized header, speedup:
return value
if "content-type" in headers:
kw["mimetype"] = headers["content-type"]
r = flask.Response(**kw)
for h in headers:
r.headers[h] = headers[h]
return r
return scodoc7func_decorator
return s7_decorator
2021-05-29 18:22:51 +02:00
# Le "context" de ScoDoc7
class ScoDoc7Context(object):
"""Context object for legacy Zope methods.
Mainly used to call published methods, as context.function(...)
"""
2021-06-16 12:02:43 +02:00
def __init__(self, name=""):
self.name = name
logging.getLogger(__name__).info("created %s" % self)
def populate(self, globals_dict):
logging.getLogger(__name__).info("populating context %s" % self)
for k in globals_dict:
if (not k.startswith("_")) and (
type(globals_dict[k]) == types.FunctionType
):
setattr(self, k, globals_dict[k].__get__(self))
def __repr__(self):
2021-06-16 12:02:43 +02:00
return "ScoDoc7Context('%s')" % self.name