ScoDoc-Lille/app/scodoc/sco_archives.py

296 lines
11 KiB
Python

# -*- mode: python -*-
# -*- coding: utf-8 -*-
##############################################################################
#
# Gestion scolarite IUT
#
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Emmanuel Viennet emmanuel.viennet@viennet.net
#
##############################################################################
"""ScoDoc : gestion des archives des PV et bulletins, et des dossiers etudiants (admission)
Archives are plain files, stored in
<SCODOC_VAR_DIR>/archives/<dept_id>
(where <SCODOC_VAR_DIR> is usually /opt/scodoc-data, and <dept_id> a departement id (int))
Les PV de jurys et documents associés sont stockées dans un sous-repertoire de la forme
<archivedir>/<dept>/<formsemestre_id>/<YYYY-MM-DD-HH-MM-SS>
(formsemestre_id est ici FormSemestre.id)
Les documents liés à l'étudiant sont dans
<archivedir>/docetuds/<dept_id>/<etudid>/<YYYY-MM-DD-HH-MM-SS>
(etudid est ici Identite.id)
Les maquettes Apogée pour l'export des notes sont dans
<archivedir>/apo_csv/<dept_id>/<annee_scolaire>-<sem_id>/<YYYY-MM-DD-HH-MM-SS>/<code_etape>.csv
Un répertoire d'archive contient des fichiers quelconques, et un fichier texte
nommé _description.txt qui est une description (humaine, format libre) de l'archive.
"""
import datetime
import glob
import mimetypes
import os
import re
import shutil
import time
import chardet
from flask import g
import app.scodoc.sco_utils as scu
from config import Config
from app import log
from app.scodoc.sco_exceptions import ScoValueError
class BaseArchiver:
"""Classe de base pour tous les archivers"""
def __init__(self, archive_type=""):
self.archive_type = archive_type
self.initialized = False
self.root = None
self.dept_id = None
def set_dept_id(self, dept_id: int):
"set dept"
self.dept_id = dept_id
def initialize(self, dept_id: int = None):
"""Fixe le département et initialise les répertoires au besoin."""
# Set departement (à chaque fois car peut changer d'une utilisation à l'autre)
self.dept_id = getattr(g, "scodoc_dept_id") if dept_id is None else dept_id
if self.initialized:
return
dirs = [Config.SCODOC_VAR_DIR, "archives"]
if self.archive_type:
dirs.append(self.archive_type)
self.root = os.path.join(*dirs) # /opt/scodoc-data/archives/<type>
log("initialized archiver, path=" + self.root)
path = dirs[0]
for directory in dirs[1:]:
path = os.path.join(path, directory)
try:
scu.GSL.acquire()
if not os.path.isdir(path):
log(f"creating directory {path}")
os.mkdir(path)
finally:
scu.GSL.release()
self.initialized = True
def get_obj_dir(self, oid: int, dept_id: int = None):
"""
:return: path to directory of archives for this object (eg formsemestre_id or etudid).
If directory does not yet exist, create it.
"""
self.initialize(dept_id)
dept_dir = os.path.join(self.root, str(self.dept_id))
try:
scu.GSL.acquire()
if not os.path.isdir(dept_dir):
log(f"creating directory {dept_dir}")
os.mkdir(dept_dir)
obj_dir = os.path.join(dept_dir, str(oid))
if not os.path.isdir(obj_dir):
log(f"creating directory {obj_dir}")
os.mkdir(obj_dir)
except FileExistsError as exc:
raise ScoException(
f"""BaseArchiver error: obj_dir={obj_dir} exists={
os.path.exists(obj_dir)
} isdir={os.path.isdir(obj_dir)}"""
) from exc
finally:
scu.GSL.release()
return obj_dir
def list_oids(self, dept_id: int = None):
"""
:return: list of archive oids
"""
self.initialize(dept_id)
base = os.path.join(self.root, str(self.dept_id)) + os.path.sep
dirs = glob.glob(base + "*")
return [os.path.split(x)[1] for x in dirs]
def list_obj_archives(self, oid: int, dept_id: int = None):
"""Returns
:return: list of archive identifiers for this object (paths to non empty dirs)
"""
self.initialize(dept_id)
base = self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep
dirs = glob.glob(
base
+ "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
)
dirs = [os.path.join(base, d) for d in dirs]
dirs = [d for d in dirs if os.path.isdir(d) and os.listdir(d)] # non empty dirs
dirs.sort()
return dirs
def delete_archive(self, archive_id: str, dept_id: int = None):
"""Delete (forever) this archive"""
self.initialize(dept_id)
try:
scu.GSL.acquire()
shutil.rmtree(archive_id, ignore_errors=True)
finally:
scu.GSL.release()
def get_archive_date(self, archive_id: str):
"""Returns date (as a DateTime object) of an archive"""
return datetime.datetime(
*[int(x) for x in os.path.split(archive_id)[1].split("-")]
)
def list_archive(self, archive_id: str, dept_id: int = None) -> str:
"""Return list of filenames (without path) in archive"""
self.initialize(dept_id)
try:
scu.GSL.acquire()
files = os.listdir(archive_id)
finally:
scu.GSL.release()
files.sort()
return [f for f in files if f and f[0] != "_"]
def get_archive_name(self, archive_id: str):
"""name identifying archive, to be used in web URLs"""
return os.path.split(archive_id)[1]
def is_valid_archive_name(self, archive_name: str):
"""check if name is valid."""
return re.match(
"^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name
)
def get_id_from_name(self, oid, archive_name: str, dept_id: int = None):
"""returns archive id (check that name is valid)"""
self.initialize(dept_id)
if not self.is_valid_archive_name(archive_name):
raise ScoValueError(f"Archive {archive_name} introuvable")
archive_id = os.path.join(self.get_obj_dir(oid, dept_id=dept_id), archive_name)
if not os.path.isdir(archive_id):
log(
f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}"
)
raise ScoValueError(f"Archive {archive_name} introuvable")
return archive_id
def get_archive_description(self, archive_id: str, dept_id: int = None) -> str:
"""Return description of archive"""
self.initialize(dept_id)
filename = os.path.join(archive_id, "_description.txt")
try:
with open(filename, encoding=scu.SCO_ENCODING) as f:
descr = f.read()
except UnicodeDecodeError:
# some (old) files may have saved under exotic encodings
with open(filename, "rb") as f:
data = f.read()
descr = data.decode(chardet.detect(data)["encoding"])
return descr
def create_obj_archive(self, oid: int, description: str, dept_id: int = None):
"""Creates a new archive for this object and returns its id."""
# id suffixé par YYYY-MM-DD-hh-mm-ss
archive_id = (
self.get_obj_dir(oid, dept_id=dept_id)
+ os.path.sep
+ "-".join([f"{x:02d}" for x in time.localtime()[:6]])
)
log(f"creating archive: {archive_id}")
try:
scu.GSL.acquire()
os.mkdir(archive_id)
except FileExistsError: # directory already exists !
pass
finally:
scu.GSL.release()
self.store(archive_id, "_description.txt", description)
return archive_id
def store(
self,
archive_id: str,
filename: str,
data: str | bytes,
dept_id: int = None,
):
"""Store data in archive, under given filename.
Filename may be modified (sanitized): return used filename
The file is created or replaced.
data may be str or bytes
"""
if isinstance(data, str):
data = data.encode(scu.SCO_ENCODING)
self.initialize(dept_id)
filename = scu.sanitize_filename(filename)
log(f"storing {filename} ({len(data)} bytes) in {archive_id}")
try:
scu.GSL.acquire()
fname = os.path.join(archive_id, filename)
with open(fname, "wb") as f:
f.write(data)
except FileNotFoundError as exc:
raise ScoValueError(
f"Erreur stockage archive (dossier inexistant, chemin {fname})"
) from exc
finally:
scu.GSL.release()
return filename
def get(self, archive_id: str, filename: str, dept_id: int = None):
"""Retreive data"""
self.initialize(dept_id)
if not scu.is_valid_filename(filename):
log(f"""Archiver.get: invalid filename '{filename}'""")
raise ScoValueError("archive introuvable (déjà supprimée ?)")
fname = os.path.join(archive_id, filename)
log(f"reading archive file {fname}")
try:
with open(fname, "rb") as f:
data = f.read()
except FileNotFoundError as exc:
raise ScoValueError(
f"Erreur lecture archive (inexistant, chemin {fname})"
) from exc
return data
def get_archived_file(self, oid, archive_name, filename, dept_id: int = None):
"""Recupère les donnees du fichier indiqué et envoie au client.
Returns: Response
"""
archive_id = self.get_id_from_name(oid, archive_name, dept_id=dept_id)
data = self.get(archive_id, filename)
mime = mimetypes.guess_type(filename)[0]
if mime is None:
mime = "application/octet-stream"
return scu.send_file(data, filename, mime=mime)