# -*- mode: python -*- # -*- coding: utf-8 -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2021 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """ScoDoc : gestion des archives des PV et bulletins, et des dossiers etudiants (admission) Archives are plain files, stored in <INSTANCE_HOME>/var/scodoc/archives/<deptid> (where <INSTANCE_HOME> is usually /opt/scodoc/instance, and <deptid> a departement id) Les PV de jurys et documents associés sont stockées dans un sous-repertoire de la forme <archivedir>/<dept>/<formsemestre_id>/<YYYY-MM-DD-HH-MM-SS> Les documents liés à l'étudiant sont dans <archivedir>/docetuds/<dept>/<etudid>/<YYYY-MM-DD-HH-MM-SS> Les maquettes Apogée pour l'export des notes sont dans <archivedir>/apo_csv/<dept>/<annee_scolaire>-<sem_id>/<YYYY-MM-DD-HH-MM-SS>/<code_etape>.csv Un répertoire d'archive contient des fichiers quelconques, et un fichier texte nommé _description.txt qui est une description (humaine, format libre) de l'archive. """ import os import time import datetime import re import shutil import glob import sco_utils as scu from config import Config import notesdb as ndb from notes_log import log import sco_formsemestre import sco_pvjury import sco_excel import sco_pvpdf import sco_groups import sco_groups_view from sco_recapcomplet import make_formsemestre_recapcomplet import sco_bulletins_pdf from TrivialFormulator import TrivialFormulator from sco_exceptions import ( AccessDenied, ) class BaseArchiver: def __init__(self, archive_type=""): dirs = [Config.INSTANCE_HOME, "var", "scodoc", "archives"] if archive_type: dirs.append(archive_type) self.root = os.path.join(*dirs) log("initialized archiver, path=" + self.root) path = dirs[0] for dir in dirs[1:]: path = os.path.join(path, dir) try: scu.GSL.acquire() if not os.path.isdir(path): log("creating directory %s" % path) os.mkdir(path) finally: scu.GSL.release() def get_obj_dir(self, context, oid): """ :return: path to directory of archives for this object (eg formsemestre_id or etudid). If directory does not yet exist, create it. """ dept_dir = os.path.join(self.root, sco_core.get_dept_id()) try: scu.GSL.acquire() if not os.path.isdir(dept_dir): log("creating directory %s" % dept_dir) os.mkdir(dept_dir) obj_dir = os.path.join(dept_dir, oid) if not os.path.isdir(obj_dir): log("creating directory %s" % obj_dir) os.mkdir(obj_dir) finally: scu.GSL.release() return obj_dir def list_oids(self, context): """ :return: list of archive oids """ base = os.path.join(self.root, sco_core.get_dept_id()) + os.path.sep dirs = glob.glob(base + "*") return [os.path.split(x)[1] for x in dirs] def list_obj_archives(self, context, oid): """Returns :return: list of archive identifiers for this object (paths to non empty dirs) """ base = self.get_obj_dir(context, oid) + os.path.sep dirs = glob.glob( base + "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]" ) dirs = [os.path.join(base, d) for d in dirs] dirs = [d for d in dirs if os.path.isdir(d) and os.listdir(d)] # non empty dirs dirs.sort() return dirs def delete_archive(self, archive_id): """Delete (forever) this archive""" try: scu.GSL.acquire() shutil.rmtree(archive_id, ignore_errors=True) finally: scu.GSL.release() def get_archive_date(self, archive_id): """Returns date (as a DateTime object) of an archive""" dt = [int(x) for x in os.path.split(archive_id)[1].split("-")] return datetime.datetime(*dt) def list_archive(self, archive_id): """Return list of filenames (without path) in archive""" try: scu.GSL.acquire() files = os.listdir(archive_id) finally: scu.GSL.release() files.sort() return [f for f in files if f and f[0] != "_"] def get_archive_name(self, archive_id): """name identifying archive, to be used in web URLs""" return os.path.split(archive_id)[1] def is_valid_archive_name(self, archive_name): """check if name is valid.""" return re.match( "^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name ) def get_id_from_name(self, context, oid, archive_name): """returns archive id (check that name is valid)""" if not self.is_valid_archive_name(archive_name): raise ValueError("invalid archive name") archive_id = os.path.join(self.get_obj_dir(context, oid), archive_name) if not os.path.isdir(archive_id): log( "invalid archive name: %s, oid=%s, archive_id=%s" % (archive_name, oid, archive_id) ) raise ValueError("invalid archive name") return archive_id def get_archive_description(self, archive_id): """Return description of archive""" return open(os.path.join(archive_id, "_description.txt")).read() def create_obj_archive(self, context, oid, description): """Creates a new archive for this object and returns its id.""" archive_id = ( self.get_obj_dir(context, oid) + os.path.sep + "-".join(["%02d" % x for x in time.localtime()[:6]]) ) log("creating archive: %s" % archive_id) try: scu.GSL.acquire() os.mkdir(archive_id) # if exists, raises an OSError finally: scu.GSL.release() self.store(archive_id, "_description.txt", description) return archive_id def store(self, archive_id, filename, data): """Store data in archive, under given filename. Filename may be modified (sanitized): return used filename The file is created or replaced. """ filename = scu.sanitize_filename(filename) log("storing %s (%d bytes) in %s" % (filename, len(data), archive_id)) try: scu.GSL.acquire() fname = os.path.join(archive_id, filename) f = open(fname, "w") f.write(data) f.close() finally: scu.GSL.release() return filename def get(self, archive_id, filename): """Retreive data""" if not scu.is_valid_filename(filename): log('Archiver.get: invalid filename "%s"' % filename) raise ValueError("invalid filename") fname = os.path.join(archive_id, filename) log("reading archive file %s" % fname) return open(fname).read() def get_archived_file(self, context, REQUEST, oid, archive_name, filename): """Recupere donnees du fichier indiqué et envoie au client""" # XXX très incomplet: devrait inférer et assigner un type MIME archive_id = self.get_id_from_name(context, oid, archive_name) data = self.get(archive_id, filename) ext = os.path.splitext(scu.strlower(filename))[1] if ext == ".html" or ext == ".htm": return data elif ext == ".xml": REQUEST.RESPONSE.setHeader("content-type", scu.XML_MIMETYPE) return data elif ext == ".xls": return sco_excel.sendExcelFile(REQUEST, data, filename) elif ext == ".csv": return scu.sendCSVFile(REQUEST, data, filename) elif ext == ".pdf": return scu.sendPDFFile(REQUEST, data, filename) return data # should set mimetype... class SemsArchiver(BaseArchiver): def __init__(self): BaseArchiver.__init__(self, archive_type="") PVArchive = SemsArchiver() # ---------------------------------------------------------------------------- def do_formsemestre_archive( context, REQUEST, formsemestre_id, group_ids=[], # si indiqué, ne prend que ces groupes description="", date_jury="", signature=None, # pour lettres indiv date_commission=None, numeroArrete=None, VDICode=None, showTitle=False, pv_title=None, with_paragraph_nom=False, anonymous=False, bulVersion="long", ): """Make and store new archive for this formsemestre. Store: - tableau recap (xls), pv jury (xls et pdf), bulletins (xml et pdf), lettres individuelles (pdf) """ archive_id = PVArchive.create_obj_archive(context, formsemestre_id, description) date = PVArchive.get_archive_date(archive_id).strftime("%d/%m/%Y à %H:%M") if not group_ids: # tous les inscrits du semestre group_ids = [sco_groups.get_default_group(context, formsemestre_id)] groups_infos = sco_groups_view.DisplayedGroupsInfos( context, group_ids, formsemestre_id=formsemestre_id, REQUEST=REQUEST ) groups_filename = "-" + groups_infos.groups_filename etudids = [m["etudid"] for m in groups_infos.members] # Tableau recap notes en XLS (pour tous les etudiants, n'utilise pas les groupes) data, _, _ = make_formsemestre_recapcomplet( context, REQUEST, formsemestre_id, format="xls" ) if data: PVArchive.store(archive_id, "Tableau_moyennes.xls", data) # Tableau recap notes en HTML (pour tous les etudiants, n'utilise pas les groupes) data, _, _ = make_formsemestre_recapcomplet( context, REQUEST, formsemestre_id, format="html", disable_etudlink=True ) if data: data = "\n".join( [ html_sco_header.sco_header( context, REQUEST, page_title="Moyennes archivées le %s" % date, head_message="Moyennes archivées le %s" % date, no_side_bar=True, ), '<h2 class="fontorange">Valeurs archivées le %s</h2>' % date, '<style type="text/css">table.notes_recapcomplet tr { color: rgb(185,70,0); }</style>', data, html_sco_header.sco_footer(context, REQUEST), ] ) PVArchive.store(archive_id, "Tableau_moyennes.html", data) # Bulletins en XML (pour tous les etudiants, n'utilise pas les groupes) data, _, _ = make_formsemestre_recapcomplet( context, REQUEST, formsemestre_id, format="xml", xml_with_decisions=True ) if data: PVArchive.store(archive_id, "Bulletins.xml", data) # Decisions de jury, en XLS data = sco_pvjury.formsemestre_pvjury( context, formsemestre_id, format="xls", REQUEST=REQUEST, publish=False ) if data: PVArchive.store(archive_id, "Decisions_Jury.xls", data) # Classeur bulletins (PDF) data, _ = sco_bulletins_pdf.get_formsemestre_bulletins_pdf( context, formsemestre_id, REQUEST, version=bulVersion ) if data: PVArchive.store(archive_id, "Bulletins.pdf", data) # Lettres individuelles (PDF): data = sco_pvpdf.pdf_lettres_individuelles( context, formsemestre_id, etudids=etudids, date_jury=date_jury, date_commission=date_commission, signature=signature, ) if data: PVArchive.store(archive_id, "CourriersDecisions%s.pdf" % groups_filename, data) # PV de jury (PDF): dpv = sco_pvjury.dict_pvjury( context, formsemestre_id, etudids=etudids, with_prev=True ) data = sco_pvpdf.pvjury_pdf( context, dpv, REQUEST, date_commission=date_commission, date_jury=date_jury, numeroArrete=numeroArrete, VDICode=VDICode, showTitle=showTitle, pv_title=pv_title, with_paragraph_nom=with_paragraph_nom, anonymous=anonymous, ) if data: PVArchive.store(archive_id, "PV_Jury%s.pdf" % groups_filename, data) def formsemestre_archive(context, REQUEST, formsemestre_id, group_ids=[]): """Make and store new archive for this formsemestre. (all students or only selected groups) """ if not context._can_edit_pv(REQUEST, formsemestre_id): raise AccessDenied( "opération non autorisée pour %s" % str(REQUEST.AUTHENTICATED_USER) ) sem = sco_formsemestre.get_formsemestre(context, formsemestre_id) if not group_ids: # tous les inscrits du semestre group_ids = [sco_groups.get_default_group(context, formsemestre_id)] groups_infos = sco_groups_view.DisplayedGroupsInfos( context, group_ids, formsemestre_id=formsemestre_id, REQUEST=REQUEST ) H = [ html_sco_header.html_sem_header( context, REQUEST, "Archiver les PV et résultats du semestre", sem=sem, javascripts=sco_groups_view.JAVASCRIPTS, cssstyles=sco_groups_view.CSSSTYLES, init_qtip=True, ), """<p class="help">Cette page permet de générer et d'archiver tous les documents résultant de ce semestre: PV de jury, lettres individuelles, tableaux récapitulatifs.</p><p class="help">Les documents archivés sont enregistrés et non modifiables, on peut les retrouver ultérieurement. </p><p class="help">On peut archiver plusieurs versions des documents (avant et après le jury par exemple). </p> """, ] F = [ """<p><em>Note: les documents sont aussi affectés par les réglages sur la page "<a href="edit_preferences">Paramétrage</a>" (accessible à l'administrateur du département).</em> </p>""", html_sco_header.sco_footer(context, REQUEST), ] descr = [ ( "description", {"input_type": "textarea", "rows": 4, "cols": 77, "title": "Description"}, ), ("sep", {"input_type": "separator", "title": "Informations sur PV de jury"}), ] descr += sco_pvjury.descrform_pvjury(context, sem) descr += [ ( "signature", { "input_type": "file", "size": 30, "explanation": "optionnel: image scannée de la signature pour les lettres individuelles", }, ), ( "bulVersion", { "input_type": "menu", "title": "Version des bulletins archivés", "labels": [ "Version courte", "Version intermédiaire", "Version complète", ], "allowed_values": ["short", "selectedevals", "long"], "default": "long", }, ), ] menu_choix_groupe = ( """<div class="group_ids_sel_menu">Groupes d'étudiants à lister: """ + sco_groups_view.menu_groups_choice(context, groups_infos) + """(pour les PV et lettres)</div>""" ) tf = TrivialFormulator( REQUEST.URL0, REQUEST.form, descr, cancelbutton="Annuler", method="POST", submitlabel="Générer et archiver les documents", name="tf", formid="group_selector", html_foot_markup=menu_choix_groupe, ) if tf[0] == 0: return "\n".join(H) + "\n" + tf[1] + "\n".join(F) elif tf[0] == -1: msg = "Opération%20annulée" else: # submit sf = tf[2]["signature"] signature = sf.read() # image of signature if tf[2]["anonymous"]: tf[2]["anonymous"] = True else: tf[2]["anonymous"] = False do_formsemestre_archive( context, REQUEST, formsemestre_id, group_ids=group_ids, description=tf[2]["description"], date_jury=tf[2]["date_jury"], date_commission=tf[2]["date_commission"], signature=signature, numeroArrete=tf[2]["numeroArrete"], VDICode=tf[2]["VDICode"], pv_title=tf[2]["pv_title"], showTitle=tf[2]["showTitle"], with_paragraph_nom=tf[2]["with_paragraph_nom"], anonymous=tf[2]["anonymous"], bulVersion=tf[2]["bulVersion"], ) msg = "Nouvelle%20archive%20créée" # submitted or cancelled: return REQUEST.RESPONSE.redirect( "formsemestre_list_archives?formsemestre_id=%s&head_message=%s" % (formsemestre_id, msg) ) def formsemestre_list_archives(context, REQUEST, formsemestre_id): """Page listing archives""" L = [] for archive_id in PVArchive.list_obj_archives(context, formsemestre_id): a = { "archive_id": archive_id, "description": PVArchive.get_archive_description(archive_id), "date": PVArchive.get_archive_date(archive_id), "content": PVArchive.list_archive(archive_id), } L.append(a) sem = sco_formsemestre.get_formsemestre(context, formsemestre_id) H = [ html_sco_header.html_sem_header( context, REQUEST, "Archive des PV et résultats ", sem ) ] if not L: H.append("<p>aucune archive enregistrée</p>") else: H.append("<ul>") for a in L: archive_name = PVArchive.get_archive_name(a["archive_id"]) H.append( '<li>%s : <em>%s</em> (<a href="formsemestre_delete_archive?formsemestre_id=%s&archive_name=%s">supprimer</a>)<ul>' % ( a["date"].strftime("%d/%m/%Y %H:%M"), a["description"], formsemestre_id, archive_name, ) ) for filename in a["content"]: H.append( '<li><a href="formsemestre_get_archived_file?formsemestre_id=%s&archive_name=%s&filename=%s">%s</a></li>' % (formsemestre_id, archive_name, filename, filename) ) if not a["content"]: H.append("<li><em>aucun fichier !</em></li>") H.append("</ul></li>") H.append("</ul>") return "\n".join(H) + html_sco_header.sco_footer(context, REQUEST) def formsemestre_get_archived_file( context, REQUEST, formsemestre_id, archive_name, filename ): """Send file to client.""" return PVArchive.get_archived_file( context, REQUEST, formsemestre_id, archive_name, filename ) def formsemestre_delete_archive( context, REQUEST, formsemestre_id, archive_name, dialog_confirmed=False ): """Delete an archive""" if not context._can_edit_pv(REQUEST, formsemestre_id): raise AccessDenied( "opération non autorisée pour %s" % str(REQUEST.AUTHENTICATED_USER) ) _ = sco_formsemestre.get_formsemestre( context, formsemestre_id ) # check formsemestre_id archive_id = PVArchive.get_id_from_name(context, formsemestre_id, archive_name) dest_url = "formsemestre_list_archives?formsemestre_id=%s" % (formsemestre_id) if not dialog_confirmed: return scu.confirm_dialog( context, """<h2>Confirmer la suppression de l'archive du %s ?</h2> <p>La suppression sera définitive.</p>""" % PVArchive.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M"), dest_url="", REQUEST=REQUEST, cancel_url=dest_url, parameters={ "formsemestre_id": formsemestre_id, "archive_name": archive_name, }, ) PVArchive.delete_archive(archive_id) return REQUEST.RESPONSE.redirect(dest_url + "&head_message=Archive%20supprimée")