Fix: gestion des archives (confusion de départements)

This commit is contained in:
Emmanuel Viennet 2023-09-07 23:09:39 +02:00 committed by iziram
parent 7fad9e0a71
commit 0f2579dc0f
9 changed files with 193 additions and 142 deletions

View File

@ -8,8 +8,9 @@
from datetime import datetime from datetime import datetime
from flask_json import as_json from flask_json import as_json
from flask import g, jsonify, request from flask import g, request
from flask_login import login_required, current_user from flask_login import login_required, current_user
from flask_sqlalchemy.query import Query
import app.scodoc.sco_assiduites as scass import app.scodoc.sco_assiduites as scass
import app.scodoc.sco_utils as scu import app.scodoc.sco_utils as scu
@ -26,7 +27,6 @@ from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_utils import json_error from app.scodoc.sco_utils import json_error
from flask_sqlalchemy.query import Query
# Partie Modèle # Partie Modèle
@ -436,7 +436,7 @@ def _delete_singular(justif_id: int, database):
if archive_name is not None: if archive_name is not None:
archiver: JustificatifArchiver = JustificatifArchiver() archiver: JustificatifArchiver = JustificatifArchiver()
try: try:
archiver.delete_justificatif(justificatif_unique.etudid, archive_name) archiver.delete_justificatif(justificatif_unique.etudiant, archive_name)
except ValueError: except ValueError:
pass pass
@ -481,7 +481,7 @@ def justif_import(justif_id: int = None):
try: try:
fname: str fname: str
archive_name, fname = archiver.save_justificatif( archive_name, fname = archiver.save_justificatif(
etudid=justificatif_unique.etudid, justificatif_unique.etudiant,
filename=file.filename, filename=file.filename,
data=file.stream.read(), data=file.stream.read(),
archive_name=archive_name, archive_name=archive_name,
@ -512,7 +512,7 @@ def justif_export(justif_id: int = None, filename: str = None):
if g.scodoc_dept: if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificaitf = query.first_or_404() justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier archive_name: str = justificatif_unique.fichier
if archive_name is None: if archive_name is None:
@ -522,7 +522,7 @@ def justif_export(justif_id: int = None, filename: str = None):
try: try:
return archiver.get_justificatif_file( return archiver.get_justificatif_file(
archive_name, justificatif_unique.etudid, filename archive_name, justificatif_unique.etudiant, filename
) )
except ScoValueError as err: except ScoValueError as err:
return json_error(404, err.args[0]) return json_error(404, err.args[0])
@ -564,10 +564,10 @@ def justif_remove(justif_id: int = None):
if remove is None or remove not in ("all", "list"): if remove is None or remove not in ("all", "list"):
return json_error(404, "param 'remove': Valeur invalide") return json_error(404, "param 'remove': Valeur invalide")
archiver: JustificatifArchiver = JustificatifArchiver() archiver: JustificatifArchiver = JustificatifArchiver()
etudid: int = justificatif_unique.etudid etud = justificatif_unique.etudiant
try: try:
if remove == "all": if remove == "all":
archiver.delete_justificatif(etudid=etudid, archive_name=archive_name) archiver.delete_justificatif(etud, archive_name=archive_name)
justificatif_unique.fichier = None justificatif_unique.fichier = None
db.session.add(justificatif_unique) db.session.add(justificatif_unique)
db.session.commit() db.session.commit()
@ -575,13 +575,13 @@ def justif_remove(justif_id: int = None):
else: else:
for fname in data.get("filenames", []): for fname in data.get("filenames", []):
archiver.delete_justificatif( archiver.delete_justificatif(
etudid=etudid, etud,
archive_name=archive_name, archive_name=archive_name,
filename=fname, filename=fname,
) )
if len(archiver.list_justificatifs(archive_name, etudid)) == 0: if len(archiver.list_justificatifs(archive_name, etud)) == 0:
archiver.delete_justificatif(etudid, archive_name) archiver.delete_justificatif(etud, archive_name)
justificatif_unique.fichier = None justificatif_unique.fichier = None
db.session.add(justificatif_unique) db.session.add(justificatif_unique)
db.session.commit() db.session.commit()
@ -616,16 +616,16 @@ def justif_list(justif_id: int = None):
archiver: JustificatifArchiver = JustificatifArchiver() archiver: JustificatifArchiver = JustificatifArchiver()
if archive_name is not None: if archive_name is not None:
filenames = archiver.list_justificatifs( filenames = archiver.list_justificatifs(
archive_name, justificatif_unique.etudid archive_name, justificatif_unique.etudiant
) )
retour = {"total": len(filenames), "filenames": []} retour = {"total": len(filenames), "filenames": []}
for fi in filenames: for filename in filenames:
if int(fi[1]) == current_user.id or current_user.has_permission( if int(filename[1]) == current_user.id or current_user.has_permission(
Permission.ScoJustifView Permission.ScoJustifView
): ):
retour["filenames"].append(fi[0]) retour["filenames"].append(filename[0])
return retour return retour

View File

@ -81,7 +81,7 @@ from app.scodoc import sco_pv_pdf
from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_exceptions import ScoValueError
class BaseArchiver(object): class BaseArchiver:
def __init__(self, archive_type=""): def __init__(self, archive_type=""):
self.archive_type = archive_type self.archive_type = archive_type
self.initialized = False self.initialized = False
@ -92,14 +92,17 @@ class BaseArchiver(object):
"set dept" "set dept"
self.dept_id = dept_id self.dept_id = dept_id
def initialize(self): def initialize(self, dept_id: int = None):
"""Fixe le département et initialise les répertoires au besoin."""
# Set departement (à chaque fois car peut changer d'une utilisation à l'autre)
self.dept_id = getattr(g, "scodoc_dept_id") if dept_id is None else dept_id
if self.initialized: if self.initialized:
return return
dirs = [Config.SCODOC_VAR_DIR, "archives"] dirs = [Config.SCODOC_VAR_DIR, "archives"]
if self.archive_type: if self.archive_type:
dirs.append(self.archive_type) dirs.append(self.archive_type)
self.root = os.path.join(*dirs) self.root = os.path.join(*dirs) # /opt/scodoc-data/archives/<type>
log("initialized archiver, path=" + self.root) log("initialized archiver, path=" + self.root)
path = dirs[0] path = dirs[0]
for directory in dirs[1:]: for directory in dirs[1:]:
@ -112,15 +115,13 @@ class BaseArchiver(object):
finally: finally:
scu.GSL.release() scu.GSL.release()
self.initialized = True self.initialized = True
if self.dept_id is None:
self.dept_id = getattr(g, "scodoc_dept_id")
def get_obj_dir(self, oid: int): def get_obj_dir(self, oid: int, dept_id: int = None):
""" """
:return: path to directory of archives for this object (eg formsemestre_id or etudid). :return: path to directory of archives for this object (eg formsemestre_id or etudid).
If directory does not yet exist, create it. If directory does not yet exist, create it.
""" """
self.initialize() self.initialize(dept_id)
dept_dir = os.path.join(self.root, str(self.dept_id)) dept_dir = os.path.join(self.root, str(self.dept_id))
try: try:
scu.GSL.acquire() scu.GSL.acquire()
@ -141,21 +142,21 @@ class BaseArchiver(object):
scu.GSL.release() scu.GSL.release()
return obj_dir return obj_dir
def list_oids(self): def list_oids(self, dept_id: int = None):
""" """
:return: list of archive oids :return: list of archive oids
""" """
self.initialize() self.initialize(dept_id)
base = os.path.join(self.root, str(self.dept_id)) + os.path.sep base = os.path.join(self.root, str(self.dept_id)) + os.path.sep
dirs = glob.glob(base + "*") dirs = glob.glob(base + "*")
return [os.path.split(x)[1] for x in dirs] return [os.path.split(x)[1] for x in dirs]
def list_obj_archives(self, oid: int): def list_obj_archives(self, oid: int, dept_id: int = None):
"""Returns """Returns
:return: list of archive identifiers for this object (paths to non empty dirs) :return: list of archive identifiers for this object (paths to non empty dirs)
""" """
self.initialize() self.initialize(dept_id)
base = self.get_obj_dir(oid) + os.path.sep base = self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep
dirs = glob.glob( dirs = glob.glob(
base base
+ "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]" + "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
@ -165,9 +166,9 @@ class BaseArchiver(object):
dirs.sort() dirs.sort()
return dirs return dirs
def delete_archive(self, archive_id: str): def delete_archive(self, archive_id: str, dept_id: int = None):
"""Delete (forever) this archive""" """Delete (forever) this archive"""
self.initialize() self.initialize(dept_id)
try: try:
scu.GSL.acquire() scu.GSL.acquire()
shutil.rmtree(archive_id, ignore_errors=True) shutil.rmtree(archive_id, ignore_errors=True)
@ -180,9 +181,9 @@ class BaseArchiver(object):
*[int(x) for x in os.path.split(archive_id)[1].split("-")] *[int(x) for x in os.path.split(archive_id)[1].split("-")]
) )
def list_archive(self, archive_id: str) -> str: def list_archive(self, archive_id: str, dept_id: int = None) -> str:
"""Return list of filenames (without path) in archive""" """Return list of filenames (without path) in archive"""
self.initialize() self.initialize(dept_id)
try: try:
scu.GSL.acquire() scu.GSL.acquire()
files = os.listdir(archive_id) files = os.listdir(archive_id)
@ -201,12 +202,12 @@ class BaseArchiver(object):
"^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name "^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name
) )
def get_id_from_name(self, oid, archive_name: str): def get_id_from_name(self, oid, archive_name: str, dept_id: int = None):
"""returns archive id (check that name is valid)""" """returns archive id (check that name is valid)"""
self.initialize() self.initialize(dept_id)
if not self.is_valid_archive_name(archive_name): if not self.is_valid_archive_name(archive_name):
raise ScoValueError(f"Archive {archive_name} introuvable") raise ScoValueError(f"Archive {archive_name} introuvable")
archive_id = os.path.join(self.get_obj_dir(oid), archive_name) archive_id = os.path.join(self.get_obj_dir(oid, dept_id=dept_id), archive_name)
if not os.path.isdir(archive_id): if not os.path.isdir(archive_id):
log( log(
f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}" f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}"
@ -214,9 +215,9 @@ class BaseArchiver(object):
raise ScoValueError(f"Archive {archive_name} introuvable") raise ScoValueError(f"Archive {archive_name} introuvable")
return archive_id return archive_id
def get_archive_description(self, archive_id: str) -> str: def get_archive_description(self, archive_id: str, dept_id: int = None) -> str:
"""Return description of archive""" """Return description of archive"""
self.initialize() self.initialize(dept_id)
filename = os.path.join(archive_id, "_description.txt") filename = os.path.join(archive_id, "_description.txt")
try: try:
with open(filename, encoding=scu.SCO_ENCODING) as f: with open(filename, encoding=scu.SCO_ENCODING) as f:
@ -229,11 +230,11 @@ class BaseArchiver(object):
return descr return descr
def create_obj_archive(self, oid: int, description: str): def create_obj_archive(self, oid: int, description: str, dept_id: int = None):
"""Creates a new archive for this object and returns its id.""" """Creates a new archive for this object and returns its id."""
# id suffixé par YYYY-MM-DD-hh-mm-ss # id suffixé par YYYY-MM-DD-hh-mm-ss
archive_id = ( archive_id = (
self.get_obj_dir(oid) self.get_obj_dir(oid, dept_id=dept_id)
+ os.path.sep + os.path.sep
+ "-".join([f"{x:02d}" for x in time.localtime()[:6]]) + "-".join([f"{x:02d}" for x in time.localtime()[:6]])
) )
@ -248,7 +249,13 @@ class BaseArchiver(object):
self.store(archive_id, "_description.txt", description) self.store(archive_id, "_description.txt", description)
return archive_id return archive_id
def store(self, archive_id: str, filename: str, data: Union[str, bytes]): def store(
self,
archive_id: str,
filename: str,
data: Union[str, bytes],
dept_id: int = None,
):
"""Store data in archive, under given filename. """Store data in archive, under given filename.
Filename may be modified (sanitized): return used filename Filename may be modified (sanitized): return used filename
The file is created or replaced. The file is created or replaced.
@ -256,7 +263,7 @@ class BaseArchiver(object):
""" """
if isinstance(data, str): if isinstance(data, str):
data = data.encode(scu.SCO_ENCODING) data = data.encode(scu.SCO_ENCODING)
self.initialize() self.initialize(dept_id)
filename = scu.sanitize_filename(filename) filename = scu.sanitize_filename(filename)
log(f"storing {filename} ({len(data)} bytes) in {archive_id}") log(f"storing {filename} ({len(data)} bytes) in {archive_id}")
try: try:
@ -268,9 +275,9 @@ class BaseArchiver(object):
scu.GSL.release() scu.GSL.release()
return filename return filename
def get(self, archive_id: str, filename: str): def get(self, archive_id: str, filename: str, dept_id: int = None):
"""Retreive data""" """Retreive data"""
self.initialize() self.initialize(dept_id)
if not scu.is_valid_filename(filename): if not scu.is_valid_filename(filename):
log(f"""Archiver.get: invalid filename '{filename}'""") log(f"""Archiver.get: invalid filename '{filename}'""")
raise ScoValueError("archive introuvable (déjà supprimée ?)") raise ScoValueError("archive introuvable (déjà supprimée ?)")
@ -280,11 +287,11 @@ class BaseArchiver(object):
data = f.read() data = f.read()
return data return data
def get_archived_file(self, oid, archive_name, filename): def get_archived_file(self, oid, archive_name, filename, dept_id: int = None):
"""Recupère les donnees du fichier indiqué et envoie au client. """Recupère les donnees du fichier indiqué et envoie au client.
Returns: Response Returns: Response
""" """
archive_id = self.get_id_from_name(oid, archive_name) archive_id = self.get_id_from_name(oid, archive_name, dept_id=dept_id)
data = self.get(archive_id, filename) data = self.get(archive_id, filename)
mime = mimetypes.guess_type(filename)[0] mime = mimetypes.guess_type(filename)[0]
if mime is None: if mime is None:
@ -298,7 +305,7 @@ class SemsArchiver(BaseArchiver):
BaseArchiver.__init__(self, archive_type="") BaseArchiver.__init__(self, archive_type="")
PVArchive = SemsArchiver() PV_ARCHIVER = SemsArchiver()
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
@ -332,8 +339,10 @@ def do_formsemestre_archive(
formsemestre = FormSemestre.get_formsemestre(formsemestre_id) formsemestre = FormSemestre.get_formsemestre(formsemestre_id)
res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre)
sem_archive_id = formsemestre_id sem_archive_id = formsemestre_id
archive_id = PVArchive.create_obj_archive(sem_archive_id, description) archive_id = PV_ARCHIVER.create_obj_archive(
date = PVArchive.get_archive_date(archive_id).strftime("%d/%m/%Y à %H:%M") sem_archive_id, description, formsemestre.dept_id
)
date = PV_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y à %H:%M")
if not group_ids: if not group_ids:
# tous les inscrits du semestre # tous les inscrits du semestre
@ -347,7 +356,12 @@ def do_formsemestre_archive(
# Tableau recap notes en XLS (pour tous les etudiants, n'utilise pas les groupes) # Tableau recap notes en XLS (pour tous les etudiants, n'utilise pas les groupes)
data, _ = gen_formsemestre_recapcomplet_excel(res, include_evaluations=True) data, _ = gen_formsemestre_recapcomplet_excel(res, include_evaluations=True)
if data: if data:
PVArchive.store(archive_id, "Tableau_moyennes" + scu.XLSX_SUFFIX, data) PV_ARCHIVER.store(
archive_id,
"Tableau_moyennes" + scu.XLSX_SUFFIX,
data,
dept_id=formsemestre.dept_id,
)
# Tableau recap notes en HTML (pour tous les etudiants, n'utilise pas les groupes) # Tableau recap notes en HTML (pour tous les etudiants, n'utilise pas les groupes)
table_html, _, _ = gen_formsemestre_recapcomplet_html_table( table_html, _, _ = gen_formsemestre_recapcomplet_html_table(
formsemestre, res, include_evaluations=True formsemestre, res, include_evaluations=True
@ -367,13 +381,17 @@ def do_formsemestre_archive(
html_sco_header.sco_footer(), html_sco_header.sco_footer(),
] ]
) )
PVArchive.store(archive_id, "Tableau_moyennes.html", data) PV_ARCHIVER.store(
archive_id, "Tableau_moyennes.html", data, dept_id=formsemestre.dept_id
)
# Bulletins en JSON # Bulletins en JSON
data = gen_formsemestre_recapcomplet_json(formsemestre_id, xml_with_decisions=True) data = gen_formsemestre_recapcomplet_json(formsemestre_id, xml_with_decisions=True)
data_js = json.dumps(data, indent=1, cls=ScoDocJSONEncoder) data_js = json.dumps(data, indent=1, cls=ScoDocJSONEncoder)
if data: if data:
PVArchive.store(archive_id, "Bulletins.json", data_js) PV_ARCHIVER.store(
archive_id, "Bulletins.json", data_js, dept_id=formsemestre.dept_id
)
# Décisions de jury, en XLS # Décisions de jury, en XLS
if formsemestre.formation.is_apc(): if formsemestre.formation.is_apc():
response = jury_but_pv.pvjury_page_but(formsemestre_id, fmt="xls") response = jury_but_pv.pvjury_page_but(formsemestre_id, fmt="xls")
@ -383,17 +401,23 @@ def do_formsemestre_archive(
formsemestre_id, format="xls", publish=False formsemestre_id, format="xls", publish=False
) )
if data: if data:
PVArchive.store( PV_ARCHIVER.store(
archive_id, archive_id,
"Decisions_Jury" + scu.XLSX_SUFFIX, "Decisions_Jury" + scu.XLSX_SUFFIX,
data, data,
dept_id=formsemestre.dept_id,
) )
# Classeur bulletins (PDF) # Classeur bulletins (PDF)
data, _ = sco_bulletins_pdf.get_formsemestre_bulletins_pdf( data, _ = sco_bulletins_pdf.get_formsemestre_bulletins_pdf(
formsemestre_id, version=bul_version formsemestre_id, version=bul_version
) )
if data: if data:
PVArchive.store(archive_id, "Bulletins.pdf", data) PV_ARCHIVER.store(
archive_id,
"Bulletins.pdf",
data,
dept_id=formsemestre.dept_id,
)
# Lettres individuelles (PDF): # Lettres individuelles (PDF):
data = sco_pv_lettres_inviduelles.pdf_lettres_individuelles( data = sco_pv_lettres_inviduelles.pdf_lettres_individuelles(
formsemestre_id, formsemestre_id,
@ -403,7 +427,12 @@ def do_formsemestre_archive(
signature=signature, signature=signature,
) )
if data: if data:
PVArchive.store(archive_id, f"CourriersDecisions{groups_filename}.pdf", data) PV_ARCHIVER.store(
archive_id,
f"CourriersDecisions{groups_filename}.pdf",
data,
dept_id=formsemestre.dept_id,
)
# PV de jury (PDF): # PV de jury (PDF):
data = sco_pv_pdf.pvjury_pdf( data = sco_pv_pdf.pvjury_pdf(
@ -419,7 +448,12 @@ def do_formsemestre_archive(
anonymous=anonymous, anonymous=anonymous,
) )
if data: if data:
PVArchive.store(archive_id, f"PV_Jury{groups_filename}.pdf", data) PV_ARCHIVER.store(
archive_id,
f"PV_Jury{groups_filename}.pdf",
data,
dept_id=formsemestre.dept_id,
)
def formsemestre_archive(formsemestre_id, group_ids: list[int] = None): def formsemestre_archive(formsemestre_id, group_ids: list[int] = None):
@ -558,14 +592,21 @@ enregistrés et non modifiables, on peut les retrouver ultérieurement.
def formsemestre_list_archives(formsemestre_id): def formsemestre_list_archives(formsemestre_id):
"""Page listing archives""" """Page listing archives"""
formsemestre = FormSemestre.get_formsemestre(formsemestre_id)
sem_archive_id = formsemestre_id sem_archive_id = formsemestre_id
L = [] L = []
for archive_id in PVArchive.list_obj_archives(sem_archive_id): for archive_id in PV_ARCHIVER.list_obj_archives(
sem_archive_id, dept_id=formsemestre.dept_id
):
a = { a = {
"archive_id": archive_id, "archive_id": archive_id,
"description": PVArchive.get_archive_description(archive_id), "description": PV_ARCHIVER.get_archive_description(
"date": PVArchive.get_archive_date(archive_id), archive_id, dept_id=formsemestre.dept_id
"content": PVArchive.list_archive(archive_id), ),
"date": PV_ARCHIVER.get_archive_date(archive_id),
"content": PV_ARCHIVER.list_archive(
archive_id, dept_id=formsemestre.dept_id
),
} }
L.append(a) L.append(a)
@ -575,7 +616,7 @@ def formsemestre_list_archives(formsemestre_id):
else: else:
H.append("<ul>") H.append("<ul>")
for a in L: for a in L:
archive_name = PVArchive.get_archive_name(a["archive_id"]) archive_name = PV_ARCHIVER.get_archive_name(a["archive_id"])
H.append( H.append(
'<li>%s : <em>%s</em> (<a href="formsemestre_delete_archive?formsemestre_id=%s&archive_name=%s">supprimer</a>)<ul>' '<li>%s : <em>%s</em> (<a href="formsemestre_delete_archive?formsemestre_id=%s&archive_name=%s">supprimer</a>)<ul>'
% ( % (
@ -602,7 +643,9 @@ def formsemestre_get_archived_file(formsemestre_id, archive_name, filename):
"""Send file to client.""" """Send file to client."""
formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id) formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id)
sem_archive_id = formsemestre.id sem_archive_id = formsemestre.id
return PVArchive.get_archived_file(sem_archive_id, archive_name, filename) return PV_ARCHIVER.get_archived_file(
sem_archive_id, archive_name, filename, dept_id=formsemestre.dept_id
)
def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=False): def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=False):
@ -617,7 +660,9 @@ def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=
) )
) )
sem_archive_id = formsemestre_id sem_archive_id = formsemestre_id
archive_id = PVArchive.get_id_from_name(sem_archive_id, archive_name) archive_id = PV_ARCHIVER.get_id_from_name(
sem_archive_id, archive_name, dept_id=formsemestre.dept_id
)
dest_url = url_for( dest_url = url_for(
"notes.formsemestre_list_archives", "notes.formsemestre_list_archives",
@ -628,7 +673,7 @@ def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=
if not dialog_confirmed: if not dialog_confirmed:
return scu.confirm_dialog( return scu.confirm_dialog(
f"""<h2>Confirmer la suppression de l'archive du { f"""<h2>Confirmer la suppression de l'archive du {
PVArchive.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M") PV_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M")
} ?</h2> } ?</h2>
<p>La suppression sera définitive.</p> <p>La suppression sera définitive.</p>
""", """,
@ -640,6 +685,6 @@ def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=
}, },
) )
PVArchive.delete_archive(archive_id) PV_ARCHIVER.delete_archive(archive_id, dept_id=formsemestre.dept_id)
flash("Archive supprimée") flash("Archive supprimée")
return flask.redirect(dest_url) return flask.redirect(dest_url)

View File

@ -52,7 +52,8 @@ class EtudsArchiver(sco_archives.BaseArchiver):
sco_archives.BaseArchiver.__init__(self, archive_type="docetuds") sco_archives.BaseArchiver.__init__(self, archive_type="docetuds")
EtudsArchive = EtudsArchiver() # Global au processus, attention !
ETUDS_ARCHIVER = EtudsArchiver()
def can_edit_etud_archive(authuser): def can_edit_etud_archive(authuser):
@ -60,21 +61,21 @@ def can_edit_etud_archive(authuser):
return authuser.has_permission(Permission.ScoEtudAddAnnotations) return authuser.has_permission(Permission.ScoEtudAddAnnotations)
def etud_list_archives_html(etudid): def etud_list_archives_html(etud: Identite):
"""HTML snippet listing archives""" """HTML snippet listing archives"""
can_edit = can_edit_etud_archive(current_user) can_edit = can_edit_etud_archive(current_user)
etuds = sco_etud.get_etud_info(etudid=etudid) etud_archive_id = etud.id
if not etuds:
raise ScoValueError("étudiant inexistant")
etud = etuds[0]
etud_archive_id = etudid
L = [] L = []
for archive_id in EtudsArchive.list_obj_archives(etud_archive_id): for archive_id in ETUDS_ARCHIVER.list_obj_archives(
etud_archive_id, dept_id=etud.dept_id
):
a = { a = {
"archive_id": archive_id, "archive_id": archive_id,
"description": EtudsArchive.get_archive_description(archive_id), "description": ETUDS_ARCHIVER.get_archive_description(
"date": EtudsArchive.get_archive_date(archive_id), archive_id, dept_id=etud.dept_id
"content": EtudsArchive.list_archive(archive_id), ),
"date": ETUDS_ARCHIVER.get_archive_date(archive_id),
"content": ETUDS_ARCHIVER.list_archive(archive_id, dept_id=etud.dept_id),
} }
L.append(a) L.append(a)
delete_icon = scu.icontag( delete_icon = scu.icontag(
@ -85,7 +86,7 @@ def etud_list_archives_html(etudid):
) )
H = ['<div class="etudarchive"><ul>'] H = ['<div class="etudarchive"><ul>']
for a in L: for a in L:
archive_name = EtudsArchive.get_archive_name(a["archive_id"]) archive_name = ETUDS_ARCHIVER.get_archive_name(a["archive_id"])
H.append( H.append(
"""<li><span class ="etudarchive_descr" title="%s">%s</span>""" """<li><span class ="etudarchive_descr" title="%s">%s</span>"""
% (a["date"].strftime("%d/%m/%Y %H:%M"), a["description"]) % (a["date"].strftime("%d/%m/%Y %H:%M"), a["description"])
@ -93,14 +94,14 @@ def etud_list_archives_html(etudid):
for filename in a["content"]: for filename in a["content"]:
H.append( H.append(
"""<a class="stdlink etudarchive_link" href="etud_get_archived_file?etudid=%s&archive_name=%s&filename=%s">%s</a>""" """<a class="stdlink etudarchive_link" href="etud_get_archived_file?etudid=%s&archive_name=%s&filename=%s">%s</a>"""
% (etudid, archive_name, filename, filename) % (etud.id, archive_name, filename, filename)
) )
if not a["content"]: if not a["content"]:
H.append("<em>aucun fichier !</em>") H.append("<em>aucun fichier !</em>")
if can_edit: if can_edit:
H.append( H.append(
'<span class="deletudarchive"><a class="smallbutton" href="etud_delete_archive?etudid=%s&archive_name=%s">%s</a></span>' '<span class="deletudarchive"><a class="smallbutton" href="etud_delete_archive?etudid=%s&archive_name=%s">%s</a></span>'
% (etudid, archive_name, delete_icon) % (etud.id, archive_name, delete_icon)
) )
else: else:
H.append('<span class="deletudarchive">' + delete_disabled_icon + "</span>") H.append('<span class="deletudarchive">' + delete_disabled_icon + "</span>")
@ -108,7 +109,7 @@ def etud_list_archives_html(etudid):
if can_edit: if can_edit:
H.append( H.append(
'<li class="addetudarchive"><a class="stdlink" href="etud_upload_file_form?etudid=%s">ajouter un fichier</a></li>' '<li class="addetudarchive"><a class="stdlink" href="etud_upload_file_form?etudid=%s">ajouter un fichier</a></li>'
% etudid % etud.id
) )
H.append("</ul></div>") H.append("</ul></div>")
return "".join(H) return "".join(H)
@ -121,12 +122,13 @@ def add_archives_info_to_etud_list(etuds):
for etud in etuds: for etud in etuds:
l = [] l = []
etud_archive_id = etud["etudid"] etud_archive_id = etud["etudid"]
for archive_id in EtudsArchive.list_obj_archives(etud_archive_id): # Here, ETUDS_ARCHIVER will use g.dept_id
for archive_id in ETUDS_ARCHIVER.list_obj_archives(etud_archive_id):
l.append( l.append(
"%s (%s)" "%s (%s)"
% ( % (
EtudsArchive.get_archive_description(archive_id), ETUDS_ARCHIVER.get_archive_description(archive_id),
EtudsArchive.list_archive(archive_id)[0], ETUDS_ARCHIVER.list_archive(archive_id)[0],
) )
) )
etud["etudarchive"] = ", ".join(l) etud["etudarchive"] = ", ".join(l)
@ -197,8 +199,8 @@ def _store_etud_file_to_new_archive(
filesize = len(data) filesize = len(data)
if filesize < 10 or filesize > scu.CONFIG.ETUD_MAX_FILE_SIZE: if filesize < 10 or filesize > scu.CONFIG.ETUD_MAX_FILE_SIZE:
return False, f"Fichier archive '{filename}' de taille invalide ! ({filesize})" return False, f"Fichier archive '{filename}' de taille invalide ! ({filesize})"
archive_id = EtudsArchive.create_obj_archive(etud_archive_id, description) archive_id = ETUDS_ARCHIVER.create_obj_archive(etud_archive_id, description)
EtudsArchive.store(archive_id, filename, data) ETUDS_ARCHIVER.store(archive_id, filename, data)
return True, "ok" return True, "ok"
@ -212,14 +214,16 @@ def etud_delete_archive(etudid, archive_name, dialog_confirmed=False):
raise ScoValueError("étudiant inexistant") raise ScoValueError("étudiant inexistant")
etud = etuds[0] etud = etuds[0]
etud_archive_id = etud["etudid"] etud_archive_id = etud["etudid"]
archive_id = EtudsArchive.get_id_from_name(etud_archive_id, archive_name) archive_id = ETUDS_ARCHIVER.get_id_from_name(
etud_archive_id, archive_name, dept_id=etud["dept_id"]
)
if not dialog_confirmed: if not dialog_confirmed:
return scu.confirm_dialog( return scu.confirm_dialog(
"""<h2>Confirmer la suppression des fichiers ?</h2> """<h2>Confirmer la suppression des fichiers ?</h2>
<p>Fichier associé le %s à l'étudiant %s</p> <p>Fichier associé le %s à l'étudiant %s</p>
<p>La suppression sera définitive.</p>""" <p>La suppression sera définitive.</p>"""
% ( % (
EtudsArchive.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M"), ETUDS_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M"),
etud["nomprenom"], etud["nomprenom"],
), ),
dest_url="", dest_url="",
@ -232,7 +236,7 @@ def etud_delete_archive(etudid, archive_name, dialog_confirmed=False):
parameters={"etudid": etudid, "archive_name": archive_name}, parameters={"etudid": etudid, "archive_name": archive_name},
) )
EtudsArchive.delete_archive(archive_id) ETUDS_ARCHIVER.delete_archive(archive_id, dept_id=etud["dept_id"])
flash("Archive supprimée") flash("Archive supprimée")
return flask.redirect( return flask.redirect(
url_for("scolar.ficheEtud", scodoc_dept=g.scodoc_dept, etudid=etudid) url_for("scolar.ficheEtud", scodoc_dept=g.scodoc_dept, etudid=etudid)
@ -246,7 +250,9 @@ def etud_get_archived_file(etudid, archive_name, filename):
raise ScoValueError("étudiant inexistant") raise ScoValueError("étudiant inexistant")
etud = etuds[0] etud = etuds[0]
etud_archive_id = etud["etudid"] etud_archive_id = etud["etudid"]
return EtudsArchive.get_archived_file(etud_archive_id, archive_name, filename) return ETUDS_ARCHIVER.get_archived_file(
etud_archive_id, archive_name, filename, dept_id=etud["dept_id"]
)
# --- Upload d'un ensemble de fichiers (pour un groupe d'étudiants) # --- Upload d'un ensemble de fichiers (pour un groupe d'étudiants)

View File

@ -102,7 +102,7 @@ class JustificatifArchiver(BaseArchiver):
def save_justificatif( def save_justificatif(
self, self,
etudid: int, etud: Identite,
filename: str, filename: str,
data: bytes or str, data: bytes or str,
archive_name: str = None, archive_name: str = None,
@ -113,17 +113,18 @@ class JustificatifArchiver(BaseArchiver):
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
Retourne l'archive_name utilisé Retourne l'archive_name utilisé
""" """
self._set_dept(etudid)
if archive_name is None: if archive_name is None:
archive_id: str = self.create_obj_archive( archive_id: str = self.create_obj_archive(
oid=etudid, description=description oid=etud.id, description=description, dept_id=etud.dept_id
) )
else: else:
archive_id: str = self.get_id_from_name(etudid, archive_name) archive_id: str = self.get_id_from_name(
etud.id, archive_name, dept_id=etud.dept_id
)
fname: str = self.store(archive_id, filename, data) fname: str = self.store(archive_id, filename, data, dept_id=etud.dept_id)
trace = Trace(self.get_obj_dir(etudid)) trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
trace.set_trace(fname, mode="entry") trace.set_trace(fname, mode="entry")
if user_id is not None: if user_id is not None:
trace.set_trace(fname, mode="user_id", current_user=user_id) trace.set_trace(fname, mode="user_id", current_user=user_id)
@ -132,7 +133,7 @@ class JustificatifArchiver(BaseArchiver):
def delete_justificatif( def delete_justificatif(
self, self,
etudid: int, etud: Identite,
archive_name: str, archive_name: str,
filename: str = None, filename: str = None,
has_trace: bool = True, has_trace: bool = True,
@ -140,92 +141,91 @@ class JustificatifArchiver(BaseArchiver):
""" """
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s) dans la trace de l'étudiant Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s)
dans la trace de l'étudiant
""" """
self._set_dept(etudid) if str(etud.id) not in self.list_oids():
if str(etudid) not in self.list_oids(): raise ValueError(f"Aucune archive pour etudid[{etud.id}]")
raise ValueError(f"Aucune archive pour etudid[{etudid}]")
archive_id = self.get_id_from_name(etudid, archive_name) archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id)
if filename is not None: if filename is not None:
if filename not in self.list_archive(archive_id): if filename not in self.list_archive(archive_id, dept_id=etud.dept_id):
raise ValueError( raise ValueError(
f"filename {filename} inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]" f"""filename {filename} inconnu dans l'archive archive_id[{
archive_id}] -> etudid[{etud.id}]"""
) )
path: str = os.path.join(self.get_obj_dir(etudid), archive_id, filename) path: str = os.path.join(
self.get_obj_dir(etud.id, dept_id=etud.dept_id), archive_id, filename
)
if os.path.isfile(path): if os.path.isfile(path):
if has_trace: if has_trace:
trace = Trace(self.get_obj_dir(etudid)) trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
trace.set_trace(filename, mode="delete") trace.set_trace(filename, mode="delete")
os.remove(path) os.remove(path)
else: else:
if has_trace: if has_trace:
trace = Trace(self.get_obj_dir(etudid)) trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
trace.set_trace(*self.list_archive(archive_id), mode="delete") trace.set_trace(
*self.list_archive(archive_id, dept_id=etud.dept_id), mode="delete"
)
self.delete_archive( self.delete_archive(
os.path.join( os.path.join(
self.get_obj_dir(etudid), self.get_obj_dir(etud.id, dept_id=etud.dept_id),
archive_id, archive_id,
) )
) )
def list_justificatifs( def list_justificatifs(
self, archive_name: str, etudid: int self, archive_name: str, etud: Identite
) -> list[tuple[str, int]]: ) -> list[tuple[str, int]]:
""" """
Retourne la liste des noms de fichiers dans l'archive donnée Retourne la liste des noms de fichiers dans l'archive donnée
""" """
self._set_dept(etudid)
filenames: list[str] = [] filenames: list[str] = []
archive_id = self.get_id_from_name(etudid, archive_name) archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id)
filenames = self.list_archive(archive_id) filenames = self.list_archive(archive_id, dept_id=etud.dept_id)
trace: Trace = Trace(self.get_obj_dir(etudid)) trace: Trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
traced = trace.get_trace(filenames) traced = trace.get_trace(filenames)
retour = [(key, value[2]) for key, value in traced.items()] retour = [(key, value[2]) for key, value in traced.items()]
return retour return retour
def get_justificatif_file(self, archive_name: str, etudid: int, filename: str): def get_justificatif_file(self, archive_name: str, etud: Identite, filename: str):
""" """
Retourne une réponse de téléchargement de fichier si le fichier existe Retourne une réponse de téléchargement de fichier si le fichier existe
""" """
self._set_dept(etudid) archive_id: str = self.get_id_from_name(
archive_id: str = self.get_id_from_name(etudid, archive_name) etud.id, archive_name, dept_id=etud.dept_id
if filename in self.list_archive(archive_id): )
return self.get_archived_file(etudid, archive_name, filename) if filename in self.list_archive(archive_id, dept_id=etud.dept_id):
return self.get_archived_file(
etud.id, archive_name, filename, dept_id=etud.dept_id
)
raise ScoValueError( raise ScoValueError(
f"Fichier {filename} introuvable dans l'archive {archive_name}" f"Fichier {filename} introuvable dans l'archive {archive_name}"
) )
def _set_dept(self, etudid: int):
"""
Mets à jour le dept_id de l'archiver en fonction du département de l'étudiant
"""
etud: Identite = Identite.query.filter_by(id=etudid).first()
self.set_dept_id(etud.dept_id)
def remove_dept_archive(self, dept_id: int = None): def remove_dept_archive(self, dept_id: int = None):
""" """
Supprime toutes les archives d'un département (ou de tous les départements) Supprime toutes les archives d'un département (ou de tous les départements)
Supprime aussi les fichiers de trace Supprime aussi les fichiers de trace
""" """
self.set_dept_id(1) # juste pour récupérer .root, dept_id n'a pas d'importance
self.initialize() self.initialize(dept_id=1)
if dept_id is None: if dept_id is None:
rmtree(self.root, ignore_errors=True) rmtree(self.root, ignore_errors=True)
else: else:
rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True) rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True)
def get_trace( def get_trace( # XXX inutilisée ?
self, etudid: int, *fnames: str self, etud: Identite, *fnames: str
) -> dict[str, list[datetime, datetime]]: ) -> dict[str, list[datetime, datetime]]:
"""Récupère la trace des justificatifs de l'étudiant""" """Récupère la trace des justificatifs de l'étudiant"""
trace = Trace(self.get_obj_dir(etudid)) trace = Trace(self.get_obj_dir(etud.id, etud.dept_id))
return trace.get_trace(fnames) return trace.get_trace(fnames)

View File

@ -85,7 +85,7 @@ class ApoCSVArchiver(sco_archives.BaseArchiver):
sco_archives.BaseArchiver.__init__(self, archive_type="apo_csv") sco_archives.BaseArchiver.__init__(self, archive_type="apo_csv")
ApoCSVArchive = ApoCSVArchiver() APO_CSV_ARCHIVER = ApoCSVArchiver()
# def get_sem_apo_archive(formsemestre_id): # def get_sem_apo_archive(formsemestre_id):
@ -126,9 +126,9 @@ def apo_csv_store(csv_data: str, annee_scolaire, sem_id):
oid = f"{annee_scolaire}-{sem_id}" oid = f"{annee_scolaire}-{sem_id}"
description = f"""{str(apo_data.etape)};{annee_scolaire};{sem_id}""" description = f"""{str(apo_data.etape)};{annee_scolaire};{sem_id}"""
archive_id = ApoCSVArchive.create_obj_archive(oid, description) archive_id = APO_CSV_ARCHIVER.create_obj_archive(oid, description)
csv_data_bytes = csv_data.encode(sco_apogee_reader.APO_OUTPUT_ENCODING) csv_data_bytes = csv_data.encode(sco_apogee_reader.APO_OUTPUT_ENCODING)
ApoCSVArchive.store(archive_id, filename, csv_data_bytes) APO_CSV_ARCHIVER.store(archive_id, filename, csv_data_bytes)
return apo_data.etape return apo_data.etape
@ -138,7 +138,7 @@ def apo_csv_list_stored_archives(annee_scolaire=None, sem_id=None, etapes=None):
:return: list of informations about stored CSV :return: list of informations about stored CSV
[ { } ] [ { } ]
""" """
oids = ApoCSVArchive.list_oids() # [ '2016-1', ... ] oids = APO_CSV_ARCHIVER.list_oids() # [ '2016-1', ... ]
# filter # filter
if annee_scolaire: if annee_scolaire:
e = re.compile(str(annee_scolaire) + "-.+") e = re.compile(str(annee_scolaire) + "-.+")
@ -149,9 +149,9 @@ def apo_csv_list_stored_archives(annee_scolaire=None, sem_id=None, etapes=None):
infos = [] # liste d'infos infos = [] # liste d'infos
for oid in oids: for oid in oids:
archive_ids = ApoCSVArchive.list_obj_archives(oid) archive_ids = APO_CSV_ARCHIVER.list_obj_archives(oid)
for archive_id in archive_ids: for archive_id in archive_ids:
description = ApoCSVArchive.get_archive_description(archive_id) description = APO_CSV_ARCHIVER.get_archive_description(archive_id)
fs = tuple(description.split(";")) fs = tuple(description.split(";"))
if len(fs) == 3: if len(fs) == 3:
arch_etape_apo, arch_annee_scolaire, arch_sem_id = fs arch_etape_apo, arch_annee_scolaire, arch_sem_id = fs
@ -165,7 +165,7 @@ def apo_csv_list_stored_archives(annee_scolaire=None, sem_id=None, etapes=None):
"annee_scolaire": int(arch_annee_scolaire), "annee_scolaire": int(arch_annee_scolaire),
"sem_id": int(arch_sem_id), "sem_id": int(arch_sem_id),
"etape_apo": arch_etape_apo, # qui contient éventuellement le VDI "etape_apo": arch_etape_apo, # qui contient éventuellement le VDI
"date": ApoCSVArchive.get_archive_date(archive_id), "date": APO_CSV_ARCHIVER.get_archive_date(archive_id),
} }
) )
infos.sort(key=lambda x: x["etape_apo"]) infos.sort(key=lambda x: x["etape_apo"])
@ -185,7 +185,7 @@ def apo_csv_list_stored_etapes(annee_scolaire, sem_id=None, etapes=None):
def apo_csv_delete(archive_id): def apo_csv_delete(archive_id):
"""Delete archived CSV""" """Delete archived CSV"""
ApoCSVArchive.delete_archive(archive_id) APO_CSV_ARCHIVER.delete_archive(archive_id)
def apo_csv_get_archive(etape_apo, annee_scolaire="", sem_id=""): def apo_csv_get_archive(etape_apo, annee_scolaire="", sem_id=""):
@ -209,7 +209,7 @@ def apo_csv_get(etape_apo="", annee_scolaire="", sem_id="") -> str:
"Etape %s non enregistree (%s, %s)" % (etape_apo, annee_scolaire, sem_id) "Etape %s non enregistree (%s, %s)" % (etape_apo, annee_scolaire, sem_id)
) )
archive_id = info["archive_id"] archive_id = info["archive_id"]
data = ApoCSVArchive.get(archive_id, etape_apo + ".csv") data = APO_CSV_ARCHIVER.get(archive_id, etape_apo + ".csv")
# ce fichier a été archivé donc généré par ScoDoc # ce fichier a été archivé donc généré par ScoDoc
# son encodage est donc APO_OUTPUT_ENCODING # son encodage est donc APO_OUTPUT_ENCODING
return data.decode(sco_apogee_reader.APO_OUTPUT_ENCODING) return data.decode(sco_apogee_reader.APO_OUTPUT_ENCODING)

View File

@ -448,7 +448,7 @@ def formsemestre_status_menubar(formsemestre: FormSemestre) -> str:
"title": "Documents archivés", "title": "Documents archivés",
"endpoint": "notes.formsemestre_list_archives", "endpoint": "notes.formsemestre_list_archives",
"args": {"formsemestre_id": formsemestre_id}, "args": {"formsemestre_id": formsemestre_id},
"enabled": sco_archives.PVArchive.list_obj_archives(formsemestre_id), "enabled": sco_archives.PV_ARCHIVER.list_obj_archives(formsemestre_id),
}, },
] ]

View File

@ -441,7 +441,7 @@ def ficheEtud(etudid=None):
# Fichiers archivés: # Fichiers archivés:
info["fichiers_archive_htm"] = ( info["fichiers_archive_htm"] = (
'<div class="fichetitre">Fichiers associés</div>' '<div class="fichetitre">Fichiers associés</div>'
+ sco_archives_etud.etud_list_archives_html(etudid) + sco_archives_etud.etud_list_archives_html(etud)
) )
# Devenir de l'étudiant: # Devenir de l'étudiant:

View File

@ -44,7 +44,7 @@ from app import models
from app.auth.models import User from app.auth.models import User
from app.but import ( from app.but import (
apc_edit_ue, apc_edit_ue,
bulletin_but_court, bulletin_but_court, # ne pas enlever: ajoute des routes !
cursus_but, cursus_but,
jury_edit_manual, jury_edit_manual,
jury_but, jury_but,

View File

@ -1,7 +1,7 @@
# -*- mode: python -*- # -*- mode: python -*-
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
SCOVERSION = "9.6.24" SCOVERSION = "9.6.25"
SCONAME = "ScoDoc" SCONAME = "ScoDoc"