# -*- mode: python -*- # -*- coding: utf-8 -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """ScoDoc : gestion des archives des PV et bulletins, et des dossiers etudiants (admission) Archives are plain files, stored in <SCODOC_VAR_DIR>/archives/<dept_id> (where <SCODOC_VAR_DIR> is usually /opt/scodoc-data, and <dept_id> a departement id (int)) Les PV de jurys et documents associés sont stockées dans un sous-repertoire de la forme <archivedir>/<dept>/<formsemestre_id>/<YYYY-MM-DD-HH-MM-SS> (formsemestre_id est ici FormSemestre.id) Les documents liés à l'étudiant sont dans <archivedir>/docetuds/<dept_id>/<etudid>/<YYYY-MM-DD-HH-MM-SS> (etudid est ici Identite.id) Les maquettes Apogée pour l'export des notes sont dans <archivedir>/apo_csv/<dept_id>/<annee_scolaire>-<sem_id>/<YYYY-MM-DD-HH-MM-SS>/<code_etape>.csv Un répertoire d'archive contient des fichiers quelconques, et un fichier texte nommé _description.txt qui est une description (humaine, format libre) de l'archive. """ import datetime import glob import mimetypes import os import re import shutil import time import chardet from flask import g import app.scodoc.sco_utils as scu from config import Config from app import log from app.scodoc.sco_exceptions import ScoValueError class BaseArchiver: """Classe de base pour tous les archivers""" def __init__(self, archive_type=""): self.archive_type = archive_type self.initialized = False self.root = None self.dept_id = None def set_dept_id(self, dept_id: int): "set dept" self.dept_id = dept_id def initialize(self, dept_id: int = None): """Fixe le département et initialise les répertoires au besoin.""" # Set departement (à chaque fois car peut changer d'une utilisation à l'autre) self.dept_id = getattr(g, "scodoc_dept_id") if dept_id is None else dept_id if self.initialized: return dirs = [Config.SCODOC_VAR_DIR, "archives"] if self.archive_type: dirs.append(self.archive_type) self.root = os.path.join(*dirs) # /opt/scodoc-data/archives/<type> log("initialized archiver, path=" + self.root) path = dirs[0] for directory in dirs[1:]: path = os.path.join(path, directory) try: scu.GSL.acquire() if not os.path.isdir(path): log(f"creating directory {path}") os.mkdir(path) finally: scu.GSL.release() self.initialized = True def get_obj_dir(self, oid: int, dept_id: int = None): """ :return: path to directory of archives for this object (eg formsemestre_id or etudid). If directory does not yet exist, create it. """ self.initialize(dept_id) dept_dir = os.path.join(self.root, str(self.dept_id)) try: scu.GSL.acquire() if not os.path.isdir(dept_dir): log(f"creating directory {dept_dir}") os.mkdir(dept_dir) obj_dir = os.path.join(dept_dir, str(oid)) if not os.path.isdir(obj_dir): log(f"creating directory {obj_dir}") os.mkdir(obj_dir) except FileExistsError as exc: raise ScoException( f"""BaseArchiver error: obj_dir={obj_dir} exists={ os.path.exists(obj_dir) } isdir={os.path.isdir(obj_dir)}""" ) from exc finally: scu.GSL.release() return obj_dir def list_oids(self, dept_id: int = None): """ :return: list of archive oids """ self.initialize(dept_id) base = os.path.join(self.root, str(self.dept_id)) + os.path.sep dirs = glob.glob(base + "*") return [os.path.split(x)[1] for x in dirs] def list_obj_archives(self, oid: int, dept_id: int = None): """Returns :return: list of archive identifiers for this object (paths to non empty dirs) """ self.initialize(dept_id) base = self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep dirs = glob.glob( base + "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]" ) dirs = [os.path.join(base, d) for d in dirs] dirs = [d for d in dirs if os.path.isdir(d) and os.listdir(d)] # non empty dirs dirs.sort() return dirs def delete_archive(self, archive_id: str, dept_id: int = None): """Delete (forever) this archive""" self.initialize(dept_id) try: scu.GSL.acquire() shutil.rmtree(archive_id, ignore_errors=True) finally: scu.GSL.release() def get_archive_date(self, archive_id: str): """Returns date (as a DateTime object) of an archive""" return datetime.datetime( *[int(x) for x in os.path.split(archive_id)[1].split("-")] ) def list_archive(self, archive_id: str, dept_id: int = None) -> str: """Return list of filenames (without path) in archive""" self.initialize(dept_id) try: scu.GSL.acquire() files = os.listdir(archive_id) finally: scu.GSL.release() files.sort() return [f for f in files if f and f[0] != "_"] def get_archive_name(self, archive_id: str): """name identifying archive, to be used in web URLs""" return os.path.split(archive_id)[1] def is_valid_archive_name(self, archive_name: str): """check if name is valid.""" return re.match( "^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name ) def get_id_from_name(self, oid, archive_name: str, dept_id: int = None): """returns archive id (check that name is valid)""" self.initialize(dept_id) if not self.is_valid_archive_name(archive_name): raise ScoValueError(f"Archive {archive_name} introuvable") archive_id = os.path.join(self.get_obj_dir(oid, dept_id=dept_id), archive_name) if not os.path.isdir(archive_id): log( f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}" ) raise ScoValueError(f"Archive {archive_name} introuvable") return archive_id def get_archive_description(self, archive_id: str, dept_id: int = None) -> str: """Return description of archive""" self.initialize(dept_id) filename = os.path.join(archive_id, "_description.txt") try: with open(filename, encoding=scu.SCO_ENCODING) as f: descr = f.read() except UnicodeDecodeError: # some (old) files may have saved under exotic encodings with open(filename, "rb") as f: data = f.read() descr = data.decode(chardet.detect(data)["encoding"]) return descr def create_obj_archive(self, oid: int, description: str, dept_id: int = None): """Creates a new archive for this object and returns its id.""" # id suffixé par YYYY-MM-DD-hh-mm-ss archive_id = ( self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep + "-".join([f"{x:02d}" for x in time.localtime()[:6]]) ) log(f"creating archive: {archive_id}") try: scu.GSL.acquire() os.mkdir(archive_id) except FileExistsError: # directory already exists ! pass finally: scu.GSL.release() self.store(archive_id, "_description.txt", description) return archive_id def store( self, archive_id: str, filename: str, data: str | bytes, dept_id: int = None, ): """Store data in archive, under given filename. Filename may be modified (sanitized): return used filename The file is created or replaced. data may be str or bytes """ if isinstance(data, str): data = data.encode(scu.SCO_ENCODING) self.initialize(dept_id) filename = scu.sanitize_filename(filename) log(f"storing {filename} ({len(data)} bytes) in {archive_id}") try: scu.GSL.acquire() fname = os.path.join(archive_id, filename) with open(fname, "wb") as f: f.write(data) except FileNotFoundError as exc: raise ScoValueError( f"Erreur stockage archive (dossier inexistant, chemin {fname})" ) from exc finally: scu.GSL.release() return filename def get(self, archive_id: str, filename: str, dept_id: int = None): """Retreive data""" self.initialize(dept_id) if not scu.is_valid_filename(filename): log(f"""Archiver.get: invalid filename '{filename}'""") raise ScoValueError("archive introuvable (déjà supprimée ?)") fname = os.path.join(archive_id, filename) log(f"reading archive file {fname}") try: with open(fname, "rb") as f: data = f.read() except FileNotFoundError as exc: raise ScoValueError( f"Erreur lecture archive (inexistant, chemin {fname})" ) from exc return data def get_archived_file(self, oid, archive_name, filename, dept_id: int = None): """Recupère les donnees du fichier indiqué et envoie au client. Returns: Response """ archive_id = self.get_id_from_name(oid, archive_name, dept_id=dept_id) data = self.get(archive_id, filename) mime = mimetypes.guess_type(filename)[0] if mime is None: mime = "application/octet-stream" return scu.send_file(data, filename, mime=mime)