# -*- mode: python -*- # -*- coding: utf-8 -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """ScoDoc : gestion des archives des PV et bulletins, et des dossiers etudiants (admission) Archives are plain files, stored in <SCODOC_VAR_DIR>/archives/<dept_id> (where <SCODOC_VAR_DIR> is usually /opt/scodoc-data, and <dept_id> a departement id (int)) Les PV de jurys et documents associés sont stockées dans un sous-repertoire de la forme <archivedir>/<dept>/<formsemestre_id>/<YYYY-MM-DD-HH-MM-SS> (formsemestre_id est ici FormSemestre.id) Les documents liés à l'étudiant sont dans <archivedir>/docetuds/<dept_id>/<etudid>/<YYYY-MM-DD-HH-MM-SS> (etudid est ici Identite.id) Les maquettes Apogée pour l'export des notes sont dans <archivedir>/apo_csv/<dept_id>/<annee_scolaire>-<sem_id>/<YYYY-MM-DD-HH-MM-SS>/<code_etape>.csv Un répertoire d'archive contient des fichiers quelconques, et un fichier texte nommé _description.txt qui est une description (humaine, format libre) de l'archive. """ import datetime import glob import json import mimetypes import os import re import shutil import time import chardet import flask from flask import flash, g, request, url_for import app.scodoc.sco_utils as scu from config import Config from app import log, ScoDocJSONEncoder from app.but import jury_but_pv from app.comp import res_sem from app.comp.res_compat import NotesTableCompat from app.models import FormSemestre from app.scodoc.TrivialFormulator import TrivialFormulator from app.scodoc.sco_exceptions import ScoException, ScoPermissionDenied from app.scodoc import html_sco_header from app.scodoc import sco_bulletins_pdf from app.scodoc import sco_groups from app.scodoc import sco_groups_view from app.scodoc import sco_pv_forms from app.scodoc import sco_pv_lettres_inviduelles from app.scodoc import sco_pv_pdf from app.scodoc.sco_exceptions import ScoValueError class BaseArchiver: def __init__(self, archive_type=""): self.archive_type = archive_type self.initialized = False self.root = None self.dept_id = None def set_dept_id(self, dept_id: int): "set dept" self.dept_id = dept_id def initialize(self, dept_id: int = None): """Fixe le département et initialise les répertoires au besoin.""" # Set departement (à chaque fois car peut changer d'une utilisation à l'autre) self.dept_id = getattr(g, "scodoc_dept_id") if dept_id is None else dept_id if self.initialized: return dirs = [Config.SCODOC_VAR_DIR, "archives"] if self.archive_type: dirs.append(self.archive_type) self.root = os.path.join(*dirs) # /opt/scodoc-data/archives/<type> log("initialized archiver, path=" + self.root) path = dirs[0] for directory in dirs[1:]: path = os.path.join(path, directory) try: scu.GSL.acquire() if not os.path.isdir(path): log(f"creating directory {path}") os.mkdir(path) finally: scu.GSL.release() self.initialized = True def get_obj_dir(self, oid: int, dept_id: int = None): """ :return: path to directory of archives for this object (eg formsemestre_id or etudid). If directory does not yet exist, create it. """ self.initialize(dept_id) dept_dir = os.path.join(self.root, str(self.dept_id)) try: scu.GSL.acquire() if not os.path.isdir(dept_dir): log(f"creating directory {dept_dir}") os.mkdir(dept_dir) obj_dir = os.path.join(dept_dir, str(oid)) if not os.path.isdir(obj_dir): log(f"creating directory {obj_dir}") os.mkdir(obj_dir) except FileExistsError as exc: raise ScoException( f"""BaseArchiver error: obj_dir={obj_dir} exists={ os.path.exists(obj_dir) } isdir={os.path.isdir(obj_dir)}""" ) from exc finally: scu.GSL.release() return obj_dir def list_oids(self, dept_id: int = None): """ :return: list of archive oids """ self.initialize(dept_id) base = os.path.join(self.root, str(self.dept_id)) + os.path.sep dirs = glob.glob(base + "*") return [os.path.split(x)[1] for x in dirs] def list_obj_archives(self, oid: int, dept_id: int = None): """Returns :return: list of archive identifiers for this object (paths to non empty dirs) """ self.initialize(dept_id) base = self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep dirs = glob.glob( base + "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]" ) dirs = [os.path.join(base, d) for d in dirs] dirs = [d for d in dirs if os.path.isdir(d) and os.listdir(d)] # non empty dirs dirs.sort() return dirs def delete_archive(self, archive_id: str, dept_id: int = None): """Delete (forever) this archive""" self.initialize(dept_id) try: scu.GSL.acquire() shutil.rmtree(archive_id, ignore_errors=True) finally: scu.GSL.release() def get_archive_date(self, archive_id: str): """Returns date (as a DateTime object) of an archive""" return datetime.datetime( *[int(x) for x in os.path.split(archive_id)[1].split("-")] ) def list_archive(self, archive_id: str, dept_id: int = None) -> str: """Return list of filenames (without path) in archive""" self.initialize(dept_id) try: scu.GSL.acquire() files = os.listdir(archive_id) finally: scu.GSL.release() files.sort() return [f for f in files if f and f[0] != "_"] def get_archive_name(self, archive_id: str): """name identifying archive, to be used in web URLs""" return os.path.split(archive_id)[1] def is_valid_archive_name(self, archive_name: str): """check if name is valid.""" return re.match( "^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name ) def get_id_from_name(self, oid, archive_name: str, dept_id: int = None): """returns archive id (check that name is valid)""" self.initialize(dept_id) if not self.is_valid_archive_name(archive_name): raise ScoValueError(f"Archive {archive_name} introuvable") archive_id = os.path.join(self.get_obj_dir(oid, dept_id=dept_id), archive_name) if not os.path.isdir(archive_id): log( f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}" ) raise ScoValueError(f"Archive {archive_name} introuvable") return archive_id def get_archive_description(self, archive_id: str, dept_id: int = None) -> str: """Return description of archive""" self.initialize(dept_id) filename = os.path.join(archive_id, "_description.txt") try: with open(filename, encoding=scu.SCO_ENCODING) as f: descr = f.read() except UnicodeDecodeError: # some (old) files may have saved under exotic encodings with open(filename, "rb") as f: data = f.read() descr = data.decode(chardet.detect(data)["encoding"]) return descr def create_obj_archive(self, oid: int, description: str, dept_id: int = None): """Creates a new archive for this object and returns its id.""" # id suffixé par YYYY-MM-DD-hh-mm-ss archive_id = ( self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep + "-".join([f"{x:02d}" for x in time.localtime()[:6]]) ) log(f"creating archive: {archive_id}") try: scu.GSL.acquire() os.mkdir(archive_id) except FileExistsError: # directory already exists ! pass finally: scu.GSL.release() self.store(archive_id, "_description.txt", description) return archive_id def store( self, archive_id: str, filename: str, data: str | bytes, dept_id: int = None, ): """Store data in archive, under given filename. Filename may be modified (sanitized): return used filename The file is created or replaced. data may be str or bytes """ if isinstance(data, str): data = data.encode(scu.SCO_ENCODING) self.initialize(dept_id) filename = scu.sanitize_filename(filename) log(f"storing {filename} ({len(data)} bytes) in {archive_id}") try: scu.GSL.acquire() fname = os.path.join(archive_id, filename) with open(fname, "wb") as f: f.write(data) finally: scu.GSL.release() return filename def get(self, archive_id: str, filename: str, dept_id: int = None): """Retreive data""" self.initialize(dept_id) if not scu.is_valid_filename(filename): log(f"""Archiver.get: invalid filename '{filename}'""") raise ScoValueError("archive introuvable (déjà supprimée ?)") fname = os.path.join(archive_id, filename) log(f"reading archive file {fname}") with open(fname, "rb") as f: data = f.read() return data def get_archived_file(self, oid, archive_name, filename, dept_id: int = None): """Recupère les donnees du fichier indiqué et envoie au client. Returns: Response """ archive_id = self.get_id_from_name(oid, archive_name, dept_id=dept_id) data = self.get(archive_id, filename) mime = mimetypes.guess_type(filename)[0] if mime is None: mime = "application/octet-stream" return scu.send_file(data, filename, mime=mime) class SemsArchiver(BaseArchiver): def __init__(self): BaseArchiver.__init__(self, archive_type="") PV_ARCHIVER = SemsArchiver() # ---------------------------------------------------------------------------- def do_formsemestre_archive( formsemestre_id, group_ids: list[int] = None, # si indiqué, ne prend que ces groupes description="", date_jury="", signature=None, # pour lettres indiv date_commission=None, numero_arrete=None, code_vdi=None, show_title=False, pv_title=None, with_paragraph_nom=False, anonymous=False, bul_version="long", ): """Make and store new archive for this formsemestre. Store: - tableau recap (xls), pv jury (xls et pdf), bulletins (xml et pdf), lettres individuelles (pdf) """ from app.scodoc.sco_recapcomplet import ( gen_formsemestre_recapcomplet_excel, gen_formsemestre_recapcomplet_html_table, gen_formsemestre_recapcomplet_json, ) formsemestre = FormSemestre.get_formsemestre(formsemestre_id) res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) sem_archive_id = formsemestre_id archive_id = PV_ARCHIVER.create_obj_archive( sem_archive_id, description, formsemestre.dept_id ) date = PV_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y à %H:%M") if not group_ids: # tous les inscrits du semestre group_ids = [sco_groups.get_default_group(formsemestre_id)] groups_infos = sco_groups_view.DisplayedGroupsInfos( group_ids, formsemestre_id=formsemestre_id ) groups_filename = "-" + groups_infos.groups_filename etudids = [m["etudid"] for m in groups_infos.members] # Tableau recap notes en XLS (pour tous les etudiants, n'utilise pas les groupes) data, _ = gen_formsemestre_recapcomplet_excel(res, include_evaluations=True) if data: PV_ARCHIVER.store( archive_id, "Tableau_moyennes" + scu.XLSX_SUFFIX, data, dept_id=formsemestre.dept_id, ) # Tableau recap notes en HTML (pour tous les etudiants, n'utilise pas les groupes) table_html, _, _ = gen_formsemestre_recapcomplet_html_table( formsemestre, res, include_evaluations=True ) if table_html: flash(f"Moyennes archivées le {date}", category="info") data = "\n".join( [ html_sco_header.sco_header( page_title=f"Moyennes archivées le {date}", no_side_bar=True, ), f'<h2 class="fontorange">Valeurs archivées le {date}</h2>', """<style type="text/css">table.notes_recapcomplet tr { color: rgb(185,70,0); } </style>""", table_html, html_sco_header.sco_footer(), ] ) PV_ARCHIVER.store( archive_id, "Tableau_moyennes.html", data, dept_id=formsemestre.dept_id ) # Bulletins en JSON data = gen_formsemestre_recapcomplet_json(formsemestre_id, xml_with_decisions=True) data_js = json.dumps(data, indent=1, cls=ScoDocJSONEncoder) if data: PV_ARCHIVER.store( archive_id, "Bulletins.json", data_js, dept_id=formsemestre.dept_id ) # Décisions de jury, en XLS if formsemestre.formation.is_apc(): response = jury_but_pv.pvjury_page_but(formsemestre_id, fmt="xls") data = response.get_data() else: # formations classiques data = sco_pv_forms.formsemestre_pvjury( formsemestre_id, format="xls", publish=False ) if data: PV_ARCHIVER.store( archive_id, "Decisions_Jury" + scu.XLSX_SUFFIX, data, dept_id=formsemestre.dept_id, ) # Classeur bulletins (PDF) data, _ = sco_bulletins_pdf.get_formsemestre_bulletins_pdf( formsemestre_id, version=bul_version ) if data: PV_ARCHIVER.store( archive_id, "Bulletins.pdf", data, dept_id=formsemestre.dept_id, ) # Lettres individuelles (PDF): data = sco_pv_lettres_inviduelles.pdf_lettres_individuelles( formsemestre_id, etudids=etudids, date_jury=date_jury, date_commission=date_commission, signature=signature, ) if data: PV_ARCHIVER.store( archive_id, f"CourriersDecisions{groups_filename}.pdf", data, dept_id=formsemestre.dept_id, ) # PV de jury (PDF): data = sco_pv_pdf.pvjury_pdf( formsemestre, etudids=etudids, date_commission=date_commission, date_jury=date_jury, numero_arrete=numero_arrete, code_vdi=code_vdi, show_title=show_title, pv_title=pv_title, with_paragraph_nom=with_paragraph_nom, anonymous=anonymous, ) if data: PV_ARCHIVER.store( archive_id, f"PV_Jury{groups_filename}.pdf", data, dept_id=formsemestre.dept_id, ) def formsemestre_archive(formsemestre_id, group_ids: list[int] = None): """Make and store new archive for this formsemestre. (all students or only selected groups) """ formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id) if not formsemestre.can_edit_pv(): raise ScoPermissionDenied( dest_url=url_for( "notes.formsemestre_status", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ) ) if not group_ids: # tous les inscrits du semestre group_ids = [sco_groups.get_default_group(formsemestre_id)] groups_infos = sco_groups_view.DisplayedGroupsInfos( group_ids, formsemestre_id=formsemestre_id ) H = [ html_sco_header.html_sem_header( "Archiver les PV et résultats du semestre", javascripts=sco_groups_view.JAVASCRIPTS, cssstyles=sco_groups_view.CSSSTYLES, init_qtip=True, ), """<p class="help">Cette page permet de générer et d'archiver tous les documents résultant de ce semestre: PV de jury, lettres individuelles, tableaux récapitulatifs.</p><p class="help">Les documents archivés sont enregistrés et non modifiables, on peut les retrouver ultérieurement. </p><p class="help">On peut archiver plusieurs versions des documents (avant et après le jury par exemple). </p> """, ] F = [ f"""<p><em>Note: les documents sont aussi affectés par les réglages sur la page "<a class="stdlink" href="{ url_for("scolar.edit_preferences", scodoc_dept=g.scodoc_dept) }">Paramétrage</a>" (accessible à l'administrateur du département).</em> </p>""", html_sco_header.sco_footer(), ] descr = [ ( "description", {"input_type": "textarea", "rows": 4, "cols": 77, "title": "Description"}, ), ("sep", {"input_type": "separator", "title": "Informations sur PV de jury"}), ] descr += sco_pv_forms.descrform_pvjury(formsemestre) descr += [ ( "signature", { "input_type": "file", "size": 30, "explanation": "optionnel: image scannée de la signature pour les lettres individuelles", }, ), ( "bul_version", { "input_type": "menu", "title": "Version des bulletins archivés", "labels": [ "Version courte", "Version intermédiaire", "Version complète", ], "allowed_values": scu.BULLETINS_VERSIONS, "default": "long", }, ), ] menu_choix_groupe = ( """<div class="group_ids_sel_menu">Groupes d'étudiants à lister: """ + sco_groups_view.menu_groups_choice(groups_infos) + """(pour les PV et lettres)</div>""" ) tf = TrivialFormulator( request.base_url, scu.get_request_args(), descr, cancelbutton="Annuler", submitlabel="Générer et archiver les documents", name="tf", formid="group_selector", html_foot_markup=menu_choix_groupe, ) if tf[0] == 0: return "\n".join(H) + "\n" + tf[1] + "\n".join(F) elif tf[0] == -1: msg = "Opération annulée" else: # submit sf = tf[2]["signature"] signature = sf.read() # image of signature if tf[2]["anonymous"]: tf[2]["anonymous"] = True else: tf[2]["anonymous"] = False do_formsemestre_archive( formsemestre_id, group_ids=group_ids, description=tf[2]["description"], date_jury=tf[2]["date_jury"], date_commission=tf[2]["date_commission"], signature=signature, numero_arrete=tf[2]["numero_arrete"], code_vdi=tf[2]["code_vdi"], pv_title=tf[2]["pv_title"], show_title=tf[2]["show_title"], with_paragraph_nom=tf[2]["with_paragraph_nom"], anonymous=tf[2]["anonymous"], bul_version=tf[2]["bul_version"], ) msg = "Nouvelle archive créée" # submitted or cancelled: flash(msg) return flask.redirect( url_for( "notes.formsemestre_list_archives", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ) ) def formsemestre_list_archives(formsemestre_id): """Page listing archives""" formsemestre = FormSemestre.get_formsemestre(formsemestre_id) sem_archive_id = formsemestre_id L = [] for archive_id in PV_ARCHIVER.list_obj_archives( sem_archive_id, dept_id=formsemestre.dept_id ): a = { "archive_id": archive_id, "description": PV_ARCHIVER.get_archive_description( archive_id, dept_id=formsemestre.dept_id ), "date": PV_ARCHIVER.get_archive_date(archive_id), "content": PV_ARCHIVER.list_archive( archive_id, dept_id=formsemestre.dept_id ), } L.append(a) H = [html_sco_header.html_sem_header("Archive des PV et résultats ")] if not L: H.append("<p>aucune archive enregistrée</p>") else: H.append("<ul>") for a in L: archive_name = PV_ARCHIVER.get_archive_name(a["archive_id"]) H.append( '<li>%s : <em>%s</em> (<a href="formsemestre_delete_archive?formsemestre_id=%s&archive_name=%s">supprimer</a>)<ul>' % ( a["date"].strftime("%d/%m/%Y %H:%M"), a["description"], formsemestre_id, archive_name, ) ) for filename in a["content"]: H.append( '<li><a href="formsemestre_get_archived_file?formsemestre_id=%s&archive_name=%s&filename=%s">%s</a></li>' % (formsemestre_id, archive_name, filename, filename) ) if not a["content"]: H.append("<li><em>aucun fichier !</em></li>") H.append("</ul></li>") H.append("</ul>") return "\n".join(H) + html_sco_header.sco_footer() def formsemestre_get_archived_file(formsemestre_id, archive_name, filename): """Send file to client.""" formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id) sem_archive_id = formsemestre.id return PV_ARCHIVER.get_archived_file( sem_archive_id, archive_name, filename, dept_id=formsemestre.dept_id ) def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=False): """Delete an archive""" formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id) if not formsemestre.can_edit_pv(): raise ScoPermissionDenied( dest_url=url_for( "notes.formsemestre_status", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ) ) sem_archive_id = formsemestre_id archive_id = PV_ARCHIVER.get_id_from_name( sem_archive_id, archive_name, dept_id=formsemestre.dept_id ) dest_url = url_for( "notes.formsemestre_list_archives", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ) if not dialog_confirmed: return scu.confirm_dialog( f"""<h2>Confirmer la suppression de l'archive du { PV_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M") } ?</h2> <p>La suppression sera définitive.</p> """, dest_url="", cancel_url=dest_url, parameters={ "formsemestre_id": formsemestre_id, "archive_name": archive_name, }, ) PV_ARCHIVER.delete_archive(archive_id, dept_id=formsemestre.dept_id) flash("Archive supprimée") return flask.redirect(dest_url)