forked from ScoDoc/ScoDoc
247 lines
8.6 KiB
Python
247 lines
8.6 KiB
Python
"""
|
|
Gestion de l'archivage des justificatifs
|
|
|
|
Ecrit par Matthias HARTMANN
|
|
"""
|
|
import os
|
|
from datetime import datetime
|
|
from shutil import rmtree
|
|
|
|
from app.models import Identite
|
|
from app.scodoc.sco_archives import BaseArchiver
|
|
from app.scodoc.sco_exceptions import ScoValueError
|
|
from app.scodoc.sco_utils import is_iso_formated
|
|
|
|
from app import log
|
|
|
|
|
|
class Trace:
|
|
"""gestionnaire de la trace des fichiers justificatifs
|
|
XXX TODO à documenter: rôle et format des fichier strace
|
|
"""
|
|
|
|
def __init__(self, path: str) -> None:
|
|
self.path: str = path + "/_trace.csv"
|
|
self.content: dict[str, list[datetime, datetime, str]] = {}
|
|
self.import_from_file()
|
|
|
|
def import_from_file(self):
|
|
"""import trace from file"""
|
|
|
|
def import_from_csv(path):
|
|
with open(path, "r", encoding="utf-8") as file:
|
|
for line in file.readlines():
|
|
csv = line.split(",")
|
|
if len(csv) < 4:
|
|
continue
|
|
fname: str = csv[0]
|
|
if fname not in os.listdir(self.path.replace("/_trace.csv", "")):
|
|
continue
|
|
entry_date: datetime = is_iso_formated(csv[1], True)
|
|
delete_date: datetime = is_iso_formated(csv[2], True)
|
|
user_id = csv[3]
|
|
self.content[fname] = [entry_date, delete_date, user_id]
|
|
|
|
if os.path.isfile(self.path):
|
|
import_from_csv(self.path)
|
|
else:
|
|
parent_dir: str = self.path[: self.path.rfind("/", 0, self.path.rfind("/"))]
|
|
if os.path.isfile(parent_dir + "/_trace.csv"):
|
|
import_from_csv(parent_dir + "/_trace.csv")
|
|
self.save_trace()
|
|
|
|
def set_trace(self, *fnames: str, mode: str = "entry", current_user: str = None):
|
|
"""Ajoute une trace du fichier donné
|
|
mode : entry / delete
|
|
"""
|
|
modes: list[str] = ["entry", "delete", "user_id"]
|
|
for fname in fnames:
|
|
if fname in modes:
|
|
continue
|
|
traced: list[datetime, datetime, str] = self.content.get(fname, False)
|
|
if not traced or mode == "entry":
|
|
self.content[fname] = [None, None, None]
|
|
traced = self.content[fname]
|
|
|
|
traced[modes.index(mode)] = (
|
|
datetime.now() if mode != "user_id" else current_user
|
|
)
|
|
self.save_trace()
|
|
|
|
def save_trace(self, new_path: str = None):
|
|
"""Enregistre la trace dans le fichier _trace.csv"""
|
|
lines: list[str] = []
|
|
if new_path is not None:
|
|
self.path = new_path
|
|
for fname, traced in self.content.items():
|
|
date_fin: datetime or None = traced[1].isoformat() if traced[1] else "None"
|
|
if traced[0] is not None:
|
|
lines.append(f"{fname},{traced[0].isoformat()},{date_fin}, {traced[2]}")
|
|
with open(self.path, "w", encoding="utf-8") as file:
|
|
file.write("\n".join(lines))
|
|
|
|
def get_trace(
|
|
self, fnames: list[str] = None
|
|
) -> dict[str, list[datetime, datetime, str]]:
|
|
"""Récupère la trace pour les noms de fichiers.
|
|
si aucun nom n'est donné, récupère tous les fichiers"""
|
|
|
|
if fnames is None:
|
|
return self.content
|
|
|
|
traced: dict = {}
|
|
for fname in fnames:
|
|
traced[fname] = self.content.get(fname, None)
|
|
|
|
return traced
|
|
|
|
|
|
class JustificatifArchiver(BaseArchiver):
|
|
"""
|
|
|
|
TOTALK:
|
|
- oid -> etudid
|
|
- archive_id -> date de création de l'archive (une archive par dépot de document)
|
|
|
|
justificatif
|
|
└── <dept_id>
|
|
└── <etudid/oid>
|
|
├── [_trace.csv]
|
|
└── <archive_id>
|
|
├── [_description.txt]
|
|
└── [<filename.ext>]
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
BaseArchiver.__init__(self, archive_type="justificatifs")
|
|
|
|
def save_justificatif(
|
|
self,
|
|
etud: Identite,
|
|
filename: str,
|
|
data: bytes or str,
|
|
archive_name: str = None,
|
|
description: str = "",
|
|
user_id: str = None,
|
|
) -> tuple[str, str]:
|
|
"""
|
|
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
|
|
Retourne l'archive_name utilisé et le filename
|
|
"""
|
|
if archive_name is None:
|
|
archive_id: str = self.create_obj_archive(
|
|
oid=etud.id, description=description, dept_id=etud.dept_id
|
|
)
|
|
else:
|
|
archive_id: str = self.get_id_from_name(
|
|
etud.id, archive_name, dept_id=etud.dept_id
|
|
)
|
|
|
|
fname: str = self.store(archive_id, filename, data, dept_id=etud.dept_id)
|
|
trace = Trace(archive_id)
|
|
trace.set_trace(fname, mode="entry")
|
|
if user_id is not None:
|
|
trace.set_trace(fname, mode="user_id", current_user=user_id)
|
|
|
|
return self.get_archive_name(archive_id), fname
|
|
|
|
def delete_justificatif(
|
|
self,
|
|
etud: Identite,
|
|
archive_name: str,
|
|
filename: str = None,
|
|
has_trace: bool = True,
|
|
):
|
|
"""
|
|
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
|
|
|
|
Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s)
|
|
dans la trace de l'étudiant
|
|
"""
|
|
log(f"debug : {archive_name}{filename} {has_trace}")
|
|
if str(etud.id) not in self.list_oids(etud.dept_id):
|
|
raise ValueError(f"Aucune archive pour etudid[{etud.id}]")
|
|
try:
|
|
archive_id = self.get_id_from_name(
|
|
etud.id, archive_name, dept_id=etud.dept_id
|
|
)
|
|
except ScoValueError as exc:
|
|
raise ValueError(f"Archive Inconnue [{archive_name}]") from exc
|
|
|
|
if filename is not None:
|
|
if filename not in self.list_archive(archive_id, dept_id=etud.dept_id):
|
|
raise ValueError(
|
|
f"""filename {filename} inconnu dans l'archive archive_id[{
|
|
archive_id}] -> etudid[{etud.id}]"""
|
|
)
|
|
|
|
path: str = os.path.join(
|
|
self.get_obj_dir(etud.id, dept_id=etud.dept_id), archive_id, filename
|
|
)
|
|
|
|
if os.path.isfile(path):
|
|
if has_trace:
|
|
trace = Trace(archive_id)
|
|
trace.set_trace(filename, mode="delete")
|
|
os.remove(path)
|
|
log(f"delete_justificatif: removed {path}")
|
|
|
|
else:
|
|
if has_trace:
|
|
trace = Trace(archive_id)
|
|
trace.set_trace(
|
|
*self.list_archive(archive_id, dept_id=etud.dept_id), mode="delete"
|
|
)
|
|
|
|
self.delete_archive(
|
|
os.path.join(
|
|
self.get_obj_dir(etud.id, dept_id=etud.dept_id),
|
|
archive_id,
|
|
)
|
|
)
|
|
log(f"delete_justificatif: deleted archive {archive_id}")
|
|
|
|
def list_justificatifs(
|
|
self, archive_name: str, etud: Identite
|
|
) -> list[tuple[str, int]]:
|
|
"""
|
|
Retourne la liste des noms de fichiers dans l'archive donnée
|
|
avec l'uid de l'utilisateur ayant saisi le fichier.
|
|
"""
|
|
filenames: list[str] = []
|
|
archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id)
|
|
|
|
filenames = self.list_archive(archive_id, dept_id=etud.dept_id)
|
|
trace: Trace = Trace(archive_id)
|
|
traced = trace.get_trace(filenames)
|
|
|
|
return [(key, value[2]) for key, value in traced.items()]
|
|
|
|
def get_justificatif_file(self, archive_name: str, etud: Identite, filename: str):
|
|
"""
|
|
Retourne une réponse de téléchargement de fichier si le fichier existe
|
|
"""
|
|
archive_id: str = self.get_id_from_name(
|
|
etud.id, archive_name, dept_id=etud.dept_id
|
|
)
|
|
if filename in self.list_archive(archive_id, dept_id=etud.dept_id):
|
|
return self.get_archived_file(
|
|
etud.id, archive_name, filename, dept_id=etud.dept_id
|
|
)
|
|
raise ScoValueError(
|
|
f"Fichier {filename} introuvable dans l'archive {archive_name}"
|
|
)
|
|
|
|
def remove_dept_archive(self, dept_id: int = None):
|
|
"""
|
|
Supprime toutes les archives d'un département (ou de tous les départements)
|
|
⚠ Supprime aussi les fichiers de trace ⚠
|
|
"""
|
|
# juste pour récupérer .root, dept_id n'a pas d'importance
|
|
self.initialize(dept_id=1)
|
|
if dept_id is None:
|
|
rmtree(self.root, ignore_errors=True)
|
|
else:
|
|
rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True)
|