ScoDoc-PE/app/scodoc/sco_archives_justificatifs.py

264 lines
9.2 KiB
Python
Raw Permalink Normal View History

"""
Gestion de l'archivage des justificatifs
Ecrit par Matthias HARTMANN
"""
import os
from datetime import datetime
from shutil import rmtree
from app.models import Identite
from app.scodoc.sco_archives import BaseArchiver
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_utils import is_iso_formated
2023-09-21 18:44:38 +02:00
from app import log
class Trace:
"""gestionnaire de la trace des fichiers justificatifs
2024-01-18 17:05:43 +01:00
Role des fichiers traces :
- Sauvegarder la date de dépôt du fichier
2024-02-27 15:59:48 +01:00
- Sauvegarder la date de suppression du fichier
(dans le cas de plusieurs fichiers pour un même justif)
- Sauvegarder l'user_id de l'utilisateur ayant déposé le fichier
(=> permet de montrer les fichiers qu'aux personnes
qui l'on déposé / qui ont le rôle AssiJustifView)
2024-01-18 17:05:43 +01:00
_trace.csv :
nom_fichier_srv,datetime_depot,datetime_suppr,user_id
"""
def __init__(self, path: str) -> None:
self.path: str = path + "/_trace.csv"
self.content: dict[str, list[datetime, datetime, str]] = {}
self.import_from_file()
def import_from_file(self):
"""import trace from file"""
2023-09-25 15:12:18 +02:00
def import_from_csv(path):
with open(path, "r", encoding="utf-8") as file:
for line in file.readlines():
csv = line.split(",")
if len(csv) < 4:
continue
fname: str = csv[0]
2023-09-25 15:12:18 +02:00
if fname not in os.listdir(self.path.replace("/_trace.csv", "")):
continue
entry_date: datetime = is_iso_formated(csv[1], True)
delete_date: datetime = is_iso_formated(csv[2], True)
2024-01-18 17:05:43 +01:00
user_id = csv[3].strip()
self.content[fname] = [entry_date, delete_date, user_id]
2023-09-25 15:12:18 +02:00
if os.path.isfile(self.path):
import_from_csv(self.path)
else:
parent_dir: str = self.path[: self.path.rfind("/", 0, self.path.rfind("/"))]
if os.path.isfile(parent_dir + "/_trace.csv"):
import_from_csv(parent_dir + "/_trace.csv")
self.save_trace()
def set_trace(self, *fnames: str, mode: str = "entry", current_user: str = None):
"""Ajoute une trace du fichier donné
mode : entry / delete
"""
modes: list[str] = ["entry", "delete", "user_id"]
for fname in fnames:
if fname in modes:
continue
traced: list[datetime, datetime, str] = self.content.get(fname, False)
2023-09-21 18:44:38 +02:00
if not traced or mode == "entry":
self.content[fname] = [None, None, None]
traced = self.content[fname]
traced[modes.index(mode)] = (
datetime.now() if mode != "user_id" else current_user
)
self.save_trace()
2023-09-25 15:12:18 +02:00
def save_trace(self, new_path: str = None):
"""Enregistre la trace dans le fichier _trace.csv"""
lines: list[str] = []
2023-09-25 15:12:18 +02:00
if new_path is not None:
self.path = new_path
for fname, traced in self.content.items():
date_fin: datetime or None = traced[1].isoformat() if traced[1] else "None"
2023-06-23 16:12:36 +02:00
if traced[0] is not None:
lines.append(f"{fname},{traced[0].isoformat()},{date_fin}, {traced[2]}")
with open(self.path, "w", encoding="utf-8") as file:
file.write("\n".join(lines))
def get_trace(
self, fnames: list[str] = None
) -> dict[str, list[datetime, datetime, str]]:
"""Récupère la trace pour les noms de fichiers.
2024-01-18 17:05:43 +01:00
si aucun nom n'est donné, récupère tous les fichiers
retour :
{
"nom_fichier_srv": [datetime_depot, datetime_suppr/None, user_id],
...
}
"""
if fnames is None:
return self.content
traced: dict = {}
for fname in fnames:
traced[fname] = self.content.get(fname, None)
return traced
class JustificatifArchiver(BaseArchiver):
"""
TOTALK:
- oid -> etudid
- archive_id -> date de création de l'archive (une archive par dépôt de document)
justificatif
<dept_id>
<etudid/oid>
[_trace.csv]
<archive_id>
[_description.txt]
[<filename.ext>]
"""
def __init__(self):
BaseArchiver.__init__(self, archive_type="justificatifs")
def save_justificatif(
self,
etud: Identite,
filename: str,
data: bytes or str,
archive_name: str = None,
description: str = "",
user_id: str = None,
2023-12-12 03:05:31 +01:00
) -> tuple[str, str]:
"""
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
2023-12-12 03:05:31 +01:00
Retourne l'archive_name utilisé et le filename
"""
if archive_name is None:
archive_id: str = self.create_obj_archive(
oid=etud.id, description=description, dept_id=etud.dept_id
)
else:
archive_id: str = self.get_id_from_name(
etud.id, archive_name, dept_id=etud.dept_id
)
fname: str = self.store(archive_id, filename, data, dept_id=etud.dept_id)
2023-09-21 18:44:38 +02:00
trace = Trace(archive_id)
trace.set_trace(fname, mode="entry")
if user_id is not None:
trace.set_trace(fname, mode="user_id", current_user=user_id)
return self.get_archive_name(archive_id), fname
def delete_justificatif(
self,
etud: Identite,
archive_name: str,
filename: str = None,
has_trace: bool = True,
):
"""
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s)
dans la trace de l'étudiant
"""
2023-12-22 15:24:53 +01:00
log(f"debug : {archive_name}{filename} {has_trace}")
2023-09-11 07:11:52 +02:00
if str(etud.id) not in self.list_oids(etud.dept_id):
raise ValueError(f"Aucune archive pour etudid[{etud.id}]")
try:
archive_id = self.get_id_from_name(
etud.id, archive_name, dept_id=etud.dept_id
)
2023-12-22 15:24:53 +01:00
except ScoValueError as exc:
raise ValueError(f"Archive Inconnue [{archive_name}]") from exc
if filename is not None:
if filename not in self.list_archive(archive_id, dept_id=etud.dept_id):
raise ValueError(
f"""filename {filename} inconnu dans l'archive archive_id[{
archive_id}] -> etudid[{etud.id}]"""
)
path: str = os.path.join(
self.get_obj_dir(etud.id, dept_id=etud.dept_id), archive_id, filename
)
if os.path.isfile(path):
if has_trace:
2023-09-21 18:44:38 +02:00
trace = Trace(archive_id)
trace.set_trace(filename, mode="delete")
os.remove(path)
2023-12-22 15:24:53 +01:00
log(f"delete_justificatif: removed {path}")
else:
if has_trace:
2023-09-21 18:44:38 +02:00
trace = Trace(archive_id)
trace.set_trace(
*self.list_archive(archive_id, dept_id=etud.dept_id), mode="delete"
)
self.delete_archive(
os.path.join(
self.get_obj_dir(etud.id, dept_id=etud.dept_id),
archive_id,
)
)
2023-12-22 15:24:53 +01:00
log(f"delete_justificatif: deleted archive {archive_id}")
def list_justificatifs(
self, archive_name: str, etud: Identite
) -> list[tuple[str, int]]:
"""
Retourne la liste des noms de fichiers dans l'archive donnée
avec l'uid de l'utilisateur ayant saisi le fichier.
"""
filenames: list[str] = []
archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id)
filenames = self.list_archive(archive_id, dept_id=etud.dept_id)
2023-09-21 18:44:38 +02:00
trace: Trace = Trace(archive_id)
traced = trace.get_trace(filenames)
2024-01-18 17:05:43 +01:00
return [(key, value[2]) for key, value in traced.items() if value is not None]
def get_justificatif_file(self, archive_name: str, etud: Identite, filename: str):
"""
Retourne une réponse de téléchargement de fichier si le fichier existe
"""
archive_id: str = self.get_id_from_name(
etud.id, archive_name, dept_id=etud.dept_id
)
if filename in self.list_archive(archive_id, dept_id=etud.dept_id):
return self.get_archived_file(
etud.id, archive_name, filename, dept_id=etud.dept_id
)
raise ScoValueError(
f"Fichier {filename} introuvable dans l'archive {archive_name}"
)
def remove_dept_archive(self, dept_id: int = None):
"""
Supprime toutes les archives d'un département (ou de tous les départements)
Supprime aussi les fichiers de trace
"""
# juste pour récupérer .root, dept_id n'a pas d'importance
self.initialize(dept_id=1)
if dept_id is None:
rmtree(self.root, ignore_errors=True)
else:
rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True)