forked from ScoDoc/ScoDoc
317 lines
12 KiB
Python
317 lines
12 KiB
Python
# -*- mode: python -*-
|
|
# -*- coding: utf-8 -*-
|
|
|
|
##############################################################################
|
|
#
|
|
# Gestion scolarite IUT
|
|
#
|
|
# Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Emmanuel Viennet emmanuel.viennet@viennet.net
|
|
#
|
|
##############################################################################
|
|
|
|
"""ScoDoc : gestion des archives des PV et bulletins, et des dossiers etudiants (admission)
|
|
|
|
|
|
Archives are plain files, stored in
|
|
<SCODOC_VAR_DIR>/archives/<dept_id>
|
|
(where <SCODOC_VAR_DIR> is usually /opt/scodoc-data, and <dept_id> a departement id (int))
|
|
|
|
Les PV de jurys et documents associés sont stockées dans un sous-repertoire de la forme
|
|
<archivedir>/<dept>/<formsemestre_id>/<YYYY-MM-DD-HH-MM-SS>
|
|
(formsemestre_id est ici FormSemestre.id)
|
|
|
|
Les documents liés à l'étudiant sont dans
|
|
<archivedir>/docetuds/<dept_id>/<etudid>/<YYYY-MM-DD-HH-MM-SS>
|
|
(etudid est ici Identite.id)
|
|
|
|
Les maquettes Apogée pour l'export des notes sont dans
|
|
<archivedir>/apo_csv/<dept_id>/<annee_scolaire>-<sem_id>/<YYYY-MM-DD-HH-MM-SS>/<code_etape>.csv
|
|
|
|
Un répertoire d'archive contient des fichiers quelconques, et un fichier texte
|
|
nommé _description.txt qui est une description (humaine, format libre) de l'archive.
|
|
|
|
"""
|
|
import datetime
|
|
import glob
|
|
import gzip
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import shutil
|
|
import time
|
|
import zlib
|
|
|
|
import chardet
|
|
|
|
from flask import g
|
|
|
|
import app.scodoc.sco_utils as scu
|
|
from config import Config
|
|
from app import log
|
|
from app.scodoc.sco_exceptions import ScoException, ScoValueError
|
|
|
|
|
|
class BaseArchiver:
|
|
"""Classe de base pour tous les archivers"""
|
|
|
|
def __init__(self, archive_type=""):
|
|
self.archive_type = archive_type
|
|
self.initialized = False
|
|
self.root = None
|
|
self.dept_id = None
|
|
|
|
def set_dept_id(self, dept_id: int):
|
|
"set dept"
|
|
self.dept_id = dept_id
|
|
|
|
def initialize(self, dept_id: int = None):
|
|
"""Fixe le département et initialise les répertoires au besoin."""
|
|
# Set departement (à chaque fois car peut changer d'une utilisation à l'autre)
|
|
self.dept_id = getattr(g, "scodoc_dept_id") if dept_id is None else dept_id
|
|
if self.initialized:
|
|
return
|
|
dirs = [Config.SCODOC_VAR_DIR, "archives"]
|
|
if self.archive_type:
|
|
dirs.append(self.archive_type)
|
|
|
|
self.root = os.path.join(*dirs) # /opt/scodoc-data/archives/<type>
|
|
log("initialized archiver, path=" + self.root)
|
|
path = dirs[0]
|
|
for directory in dirs[1:]:
|
|
path = os.path.join(path, directory)
|
|
try:
|
|
scu.GSL.acquire()
|
|
if not os.path.isdir(path):
|
|
log(f"creating directory {path}")
|
|
os.mkdir(path)
|
|
finally:
|
|
scu.GSL.release()
|
|
self.initialized = True
|
|
|
|
def get_obj_dir(self, oid: int, dept_id: int = None):
|
|
"""
|
|
:return: path to directory of archives for this object (eg formsemestre_id or etudid).
|
|
If directory does not yet exist, create it.
|
|
"""
|
|
self.initialize(dept_id)
|
|
dept_dir = os.path.join(self.root, str(self.dept_id))
|
|
try:
|
|
scu.GSL.acquire()
|
|
if not os.path.isdir(dept_dir):
|
|
log(f"creating directory {dept_dir}")
|
|
os.mkdir(dept_dir)
|
|
obj_dir = os.path.join(dept_dir, str(oid))
|
|
if not os.path.isdir(obj_dir):
|
|
log(f"creating directory {obj_dir}")
|
|
os.mkdir(obj_dir)
|
|
except FileExistsError as exc:
|
|
raise ScoException(
|
|
f"""BaseArchiver error: obj_dir={obj_dir} exists={
|
|
os.path.exists(obj_dir)
|
|
} isdir={os.path.isdir(obj_dir)}"""
|
|
) from exc
|
|
finally:
|
|
scu.GSL.release()
|
|
return obj_dir
|
|
|
|
def list_oids(self, dept_id: int = None):
|
|
"""
|
|
:return: list of archive oids
|
|
"""
|
|
self.initialize(dept_id)
|
|
base = os.path.join(self.root, str(self.dept_id)) + os.path.sep
|
|
dirs = glob.glob(base + "*")
|
|
return [os.path.split(x)[1] for x in dirs]
|
|
|
|
def list_obj_archives(self, oid: int, dept_id: int = None) -> list[str]:
|
|
"""Returns
|
|
:return: list of archive identifiers for this object (paths to non empty dirs)
|
|
"""
|
|
self.initialize(dept_id)
|
|
base = self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep
|
|
dirs = glob.glob(
|
|
base
|
|
+ "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
|
|
)
|
|
dirs = [os.path.join(base, d) for d in dirs]
|
|
dirs = [d for d in dirs if os.path.isdir(d) and os.listdir(d)] # non empty dirs
|
|
dirs.sort()
|
|
return dirs
|
|
|
|
def delete_archive(self, archive_id: str, dept_id: int = None):
|
|
"""Delete (forever) this archive"""
|
|
self.initialize(dept_id)
|
|
try:
|
|
scu.GSL.acquire()
|
|
shutil.rmtree(archive_id, ignore_errors=True)
|
|
finally:
|
|
scu.GSL.release()
|
|
|
|
def get_archive_date(self, archive_id: str):
|
|
"""Returns date (as a DateTime object) of an archive"""
|
|
return datetime.datetime(
|
|
*[int(x) for x in os.path.split(archive_id)[1].split("-")]
|
|
)
|
|
|
|
def list_archive(self, archive_id: str, dept_id: int = None) -> str:
|
|
"""Return list of filenames (without path) in archive"""
|
|
self.initialize(dept_id)
|
|
try:
|
|
scu.GSL.acquire()
|
|
files = os.listdir(archive_id)
|
|
finally:
|
|
scu.GSL.release()
|
|
files.sort()
|
|
return [f for f in files if f and f[0] != "_"]
|
|
|
|
def get_archive_name(self, archive_id: str):
|
|
"""name identifying archive, to be used in web URLs"""
|
|
return os.path.split(archive_id)[1]
|
|
|
|
def is_valid_archive_name(self, archive_name: str):
|
|
"""check if name is valid."""
|
|
return re.match(
|
|
"^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name
|
|
)
|
|
|
|
def get_id_from_name(self, oid, archive_name: str, dept_id: int = None):
|
|
"""returns archive id (check that name is valid)"""
|
|
self.initialize(dept_id)
|
|
if not self.is_valid_archive_name(archive_name):
|
|
raise ScoValueError(f"Archive {archive_name} introuvable")
|
|
archive_id = os.path.join(self.get_obj_dir(oid, dept_id=dept_id), archive_name)
|
|
if not os.path.isdir(archive_id):
|
|
log(
|
|
f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}"
|
|
)
|
|
raise ScoValueError(f"Archive {archive_name} introuvable")
|
|
return archive_id
|
|
|
|
def get_archive_description(self, archive_id: str, dept_id: int = None) -> str:
|
|
"""Return description of archive"""
|
|
self.initialize(dept_id)
|
|
filename = os.path.join(archive_id, "_description.txt")
|
|
try:
|
|
with open(filename, encoding=scu.SCO_ENCODING) as f:
|
|
descr = f.read()
|
|
except UnicodeDecodeError:
|
|
# some (old) files may have saved under exotic encodings
|
|
with open(filename, "rb") as f:
|
|
data = f.read()
|
|
descr = data.decode(chardet.detect(data)["encoding"])
|
|
|
|
return descr
|
|
|
|
def create_obj_archive(self, oid: int, description: str, dept_id: int = None):
|
|
"""Creates a new archive for this object and returns its id."""
|
|
# id suffixé par YYYY-MM-DD-hh-mm-ss
|
|
archive_id = (
|
|
self.get_obj_dir(oid, dept_id=dept_id)
|
|
+ os.path.sep
|
|
+ "-".join([f"{x:02d}" for x in time.localtime()[:6]])
|
|
)
|
|
log(f"creating archive: {archive_id}")
|
|
try:
|
|
scu.GSL.acquire()
|
|
os.mkdir(archive_id)
|
|
except FileExistsError: # directory already exists !
|
|
pass
|
|
finally:
|
|
scu.GSL.release()
|
|
self.store(archive_id, "_description.txt", description)
|
|
return archive_id
|
|
|
|
def store(
|
|
self,
|
|
archive_id: str,
|
|
filename: str,
|
|
data: str | bytes,
|
|
dept_id: int = None,
|
|
compress=False,
|
|
):
|
|
"""Store data in archive, under given filename.
|
|
Filename may be modified (sanitized): return used filename
|
|
The file is created or replaced.
|
|
data may be str or bytes
|
|
If compress, data is gziped and filename suffix ".gz" added.
|
|
"""
|
|
if isinstance(data, str):
|
|
data = data.encode(scu.SCO_ENCODING)
|
|
self.initialize(dept_id)
|
|
filename = scu.sanitize_filename(filename)
|
|
log(f"storing {filename} ({len(data)} bytes) in {archive_id}")
|
|
try:
|
|
scu.GSL.acquire()
|
|
fname = os.path.join(archive_id, filename)
|
|
if compress:
|
|
if not fname.endswith(".gz"):
|
|
fname += ".gz"
|
|
with gzip.open(fname, "wb") as f:
|
|
f.write(data)
|
|
else:
|
|
with open(fname, "wb") as f:
|
|
f.write(data)
|
|
except FileNotFoundError as exc:
|
|
raise ScoValueError(
|
|
f"Erreur stockage archive (dossier inexistant, chemin {fname})"
|
|
) from exc
|
|
finally:
|
|
scu.GSL.release()
|
|
return filename
|
|
|
|
def get(self, archive_id: str, filename: str, dept_id: int = None):
|
|
"""Retreive data"""
|
|
self.initialize(dept_id)
|
|
if not scu.is_valid_filename(filename):
|
|
log(f"""Archiver.get: invalid filename '{filename}'""")
|
|
raise ScoValueError("archive introuvable (déjà supprimée ?)")
|
|
fname = os.path.join(archive_id, filename)
|
|
log(f"reading archive file {fname}")
|
|
try:
|
|
if fname.endswith(".gz"):
|
|
try:
|
|
with gzip.open(fname) as f:
|
|
data = f.read()
|
|
except (OSError, EOFError, zlib.error) as exc:
|
|
raise ScoValueError(
|
|
f"Erreur lecture archive ({fname} invalide)"
|
|
) from exc
|
|
else:
|
|
with open(fname, "rb") as f:
|
|
data = f.read()
|
|
except FileNotFoundError as exc:
|
|
raise ScoValueError(
|
|
f"Erreur lecture archive (inexistant, chemin {fname})"
|
|
) from exc
|
|
return data
|
|
|
|
def get_archived_file(self, oid, archive_name, filename, dept_id: int = None):
|
|
"""Recupère les donnees du fichier indiqué et envoie au client.
|
|
Returns: Response
|
|
"""
|
|
archive_id = self.get_id_from_name(oid, archive_name, dept_id=dept_id)
|
|
data = self.get(archive_id, filename)
|
|
if filename.endswith(".gz"):
|
|
filename = filename[:-3]
|
|
mime = mimetypes.guess_type(filename)[0]
|
|
if mime is None:
|
|
mime = "application/octet-stream"
|
|
|
|
return scu.send_file(data, filename, mime=mime)
|