""" Gestion de l'archivage des justificatifs Ecrit par Matthias HARTMANN """ import os from datetime import datetime from shutil import rmtree from app.models import Identite from app.scodoc.sco_archives import BaseArchiver from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_utils import is_iso_formated from app import log class Trace: """gestionnaire de la trace des fichiers justificatifs Role des fichiers traces : - Sauvegarder la date de dépôt du fichier - Sauvegarder la date de suppression du fichier (dans le cas de plusieurs fichiers pour un même justif) - Sauvegarder l'user_id de l'utilisateur ayant déposé le fichier (=> permet de montrer les fichiers qu'aux personnes qui l'on déposé / qui ont le rôle AssiJustifView) _trace.csv : nom_fichier_srv,datetime_depot,datetime_suppr,user_id """ def __init__(self, path: str) -> None: self.path: str = path + "/_trace.csv" self.content: dict[str, list[datetime, datetime, str]] = {} self.import_from_file() def import_from_file(self): """import trace from file""" def import_from_csv(path): with open(path, "r", encoding="utf-8") as file: for line in file.readlines(): csv = line.split(",") if len(csv) < 4: continue fname: str = csv[0] if fname not in os.listdir(self.path.replace("/_trace.csv", "")): continue entry_date: datetime = is_iso_formated(csv[1], True) delete_date: datetime = is_iso_formated(csv[2], True) user_id = csv[3].strip() self.content[fname] = [entry_date, delete_date, user_id] if os.path.isfile(self.path): import_from_csv(self.path) else: parent_dir: str = self.path[: self.path.rfind("/", 0, self.path.rfind("/"))] if os.path.isfile(parent_dir + "/_trace.csv"): import_from_csv(parent_dir + "/_trace.csv") self.save_trace() def set_trace(self, *fnames: str, mode: str = "entry", current_user: str = None): """Ajoute une trace du fichier donné mode : entry / delete """ modes: list[str] = ["entry", "delete", "user_id"] for fname in fnames: if fname in modes: continue traced: list[datetime, datetime, str] = self.content.get(fname, False) if not traced or mode == "entry": self.content[fname] = [None, None, None] traced = self.content[fname] traced[modes.index(mode)] = ( datetime.now() if mode != "user_id" else current_user ) self.save_trace() def save_trace(self, new_path: str = None): """Enregistre la trace dans le fichier _trace.csv""" lines: list[str] = [] if new_path is not None: self.path = new_path for fname, traced in self.content.items(): date_fin: datetime or None = traced[1].isoformat() if traced[1] else "None" if traced[0] is not None: lines.append(f"{fname},{traced[0].isoformat()},{date_fin}, {traced[2]}") with open(self.path, "w", encoding="utf-8") as file: file.write("\n".join(lines)) def get_trace( self, fnames: list[str] = None ) -> dict[str, list[datetime, datetime, str]]: """Récupère la trace pour les noms de fichiers. si aucun nom n'est donné, récupère tous les fichiers retour : { "nom_fichier_srv": [datetime_depot, datetime_suppr/None, user_id], ... } """ if fnames is None: return self.content traced: dict = {} for fname in fnames: traced[fname] = self.content.get(fname, None) return traced class JustificatifArchiver(BaseArchiver): """ TOTALK: - oid -> etudid - archive_id -> date de création de l'archive (une archive par dépôt de document) justificatif └── <dept_id> └── <etudid/oid> ├── [_trace.csv] └── <archive_id> ├── [_description.txt] └── [<filename.ext>] """ def __init__(self): BaseArchiver.__init__(self, archive_type="justificatifs") def save_justificatif( self, etud: Identite, filename: str, data: bytes or str, archive_name: str = None, description: str = "", user_id: str = None, ) -> tuple[str, str]: """ Ajoute un fichier dans une archive "justificatif" pour l'etudid donné Retourne l'archive_name utilisé et le filename """ if archive_name is None: archive_id: str = self.create_obj_archive( oid=etud.id, description=description, dept_id=etud.dept_id ) else: archive_id: str = self.get_id_from_name( etud.id, archive_name, dept_id=etud.dept_id ) fname: str = self.store(archive_id, filename, data, dept_id=etud.dept_id) trace = Trace(archive_id) trace.set_trace(fname, mode="entry") if user_id is not None: trace.set_trace(fname, mode="user_id", current_user=user_id) return self.get_archive_name(archive_id), fname def delete_justificatif( self, etud: Identite, archive_name: str, filename: str = None, has_trace: bool = True, ): """ Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s) dans la trace de l'étudiant """ log(f"debug : {archive_name}{filename} {has_trace}") if str(etud.id) not in self.list_oids(etud.dept_id): raise ValueError(f"Aucune archive pour etudid[{etud.id}]") try: archive_id = self.get_id_from_name( etud.id, archive_name, dept_id=etud.dept_id ) except ScoValueError as exc: raise ValueError(f"Archive Inconnue [{archive_name}]") from exc if filename is not None: if filename not in self.list_archive(archive_id, dept_id=etud.dept_id): raise ValueError( f"""filename {filename} inconnu dans l'archive archive_id[{ archive_id}] -> etudid[{etud.id}]""" ) path: str = os.path.join( self.get_obj_dir(etud.id, dept_id=etud.dept_id), archive_id, filename ) if os.path.isfile(path): if has_trace: trace = Trace(archive_id) trace.set_trace(filename, mode="delete") os.remove(path) log(f"delete_justificatif: removed {path}") else: if has_trace: trace = Trace(archive_id) trace.set_trace( *self.list_archive(archive_id, dept_id=etud.dept_id), mode="delete" ) self.delete_archive( os.path.join( self.get_obj_dir(etud.id, dept_id=etud.dept_id), archive_id, ) ) log(f"delete_justificatif: deleted archive {archive_id}") def list_justificatifs( self, archive_name: str, etud: Identite ) -> list[tuple[str, int]]: """ Retourne la liste des noms de fichiers dans l'archive donnée avec l'uid de l'utilisateur ayant saisi le fichier. """ filenames: list[str] = [] archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id) filenames = self.list_archive(archive_id, dept_id=etud.dept_id) trace: Trace = Trace(archive_id) traced = trace.get_trace(filenames) return [(key, value[2]) for key, value in traced.items() if value is not None] def get_justificatif_file(self, archive_name: str, etud: Identite, filename: str): """ Retourne une réponse de téléchargement de fichier si le fichier existe """ archive_id: str = self.get_id_from_name( etud.id, archive_name, dept_id=etud.dept_id ) if filename in self.list_archive(archive_id, dept_id=etud.dept_id): return self.get_archived_file( etud.id, archive_name, filename, dept_id=etud.dept_id ) raise ScoValueError( f"Fichier {filename} introuvable dans l'archive {archive_name}" ) def remove_dept_archive(self, dept_id: int = None): """ Supprime toutes les archives d'un département (ou de tous les départements) ⚠ Supprime aussi les fichiers de trace ⚠ """ # juste pour récupérer .root, dept_id n'a pas d'importance self.initialize(dept_id=1) if dept_id is None: rmtree(self.root, ignore_errors=True) else: rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True)