1
0
forked from ScoDoc/ScoDoc

add some type annotations

This commit is contained in:
Emmanuel Viennet 2022-12-26 20:13:34 -03:00 committed by iziram
parent cf900d2027
commit 7adc7d824b
2 changed files with 18 additions and 16 deletions

View File

@ -105,13 +105,13 @@ class BaseArchiver(object):
try: try:
scu.GSL.acquire() scu.GSL.acquire()
if not os.path.isdir(path): if not os.path.isdir(path):
log("creating directory %s" % path) log(f"creating directory {path}")
os.mkdir(path) os.mkdir(path)
finally: finally:
scu.GSL.release() scu.GSL.release()
self.initialized = True self.initialized = True
def get_obj_dir(self, oid): def get_obj_dir(self, oid: int):
""" """
:return: path to directory of archives for this object (eg formsemestre_id or etudid). :return: path to directory of archives for this object (eg formsemestre_id or etudid).
If directory does not yet exist, create it. If directory does not yet exist, create it.
@ -142,7 +142,7 @@ class BaseArchiver(object):
dirs = glob.glob(base + "*") dirs = glob.glob(base + "*")
return [os.path.split(x)[1] for x in dirs] return [os.path.split(x)[1] for x in dirs]
def list_obj_archives(self, oid): def list_obj_archives(self, oid: int):
"""Returns """Returns
:return: list of archive identifiers for this object (paths to non empty dirs) :return: list of archive identifiers for this object (paths to non empty dirs)
""" """
@ -157,7 +157,7 @@ class BaseArchiver(object):
dirs.sort() dirs.sort()
return dirs return dirs
def delete_archive(self, archive_id): def delete_archive(self, archive_id: str):
"""Delete (forever) this archive""" """Delete (forever) this archive"""
self.initialize() self.initialize()
try: try:
@ -166,7 +166,7 @@ class BaseArchiver(object):
finally: finally:
scu.GSL.release() scu.GSL.release()
def get_archive_date(self, archive_id): def get_archive_date(self, archive_id: str):
"""Returns date (as a DateTime object) of an archive""" """Returns date (as a DateTime object) of an archive"""
return datetime.datetime( return datetime.datetime(
*[int(x) for x in os.path.split(archive_id)[1].split("-")] *[int(x) for x in os.path.split(archive_id)[1].split("-")]
@ -183,17 +183,17 @@ class BaseArchiver(object):
files.sort() files.sort()
return [f for f in files if f and f[0] != "_"] return [f for f in files if f and f[0] != "_"]
def get_archive_name(self, archive_id): def get_archive_name(self, archive_id: str):
"""name identifying archive, to be used in web URLs""" """name identifying archive, to be used in web URLs"""
return os.path.split(archive_id)[1] return os.path.split(archive_id)[1]
def is_valid_archive_name(self, archive_name): def is_valid_archive_name(self, archive_name: str):
"""check if name is valid.""" """check if name is valid."""
return re.match( return re.match(
"^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name "^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name
) )
def get_id_from_name(self, oid, archive_name): def get_id_from_name(self, oid, archive_name: str):
"""returns archive id (check that name is valid)""" """returns archive id (check that name is valid)"""
self.initialize() self.initialize()
if not self.is_valid_archive_name(archive_name): if not self.is_valid_archive_name(archive_name):
@ -206,7 +206,7 @@ class BaseArchiver(object):
raise ScoValueError(f"Archive {archive_name} introuvable") raise ScoValueError(f"Archive {archive_name} introuvable")
return archive_id return archive_id
def get_archive_description(self, archive_id): def get_archive_description(self, archive_id: str) -> str:
"""Return description of archive""" """Return description of archive"""
self.initialize() self.initialize()
filename = os.path.join(archive_id, "_description.txt") filename = os.path.join(archive_id, "_description.txt")
@ -247,7 +247,7 @@ class BaseArchiver(object):
data = data.encode(scu.SCO_ENCODING) data = data.encode(scu.SCO_ENCODING)
self.initialize() self.initialize()
filename = scu.sanitize_filename(filename) filename = scu.sanitize_filename(filename)
log("storing %s (%d bytes) in %s" % (filename, len(data), archive_id)) log(f"storing {filename} ({len(data)} bytes) in {archive_id}")
try: try:
scu.GSL.acquire() scu.GSL.acquire()
fname = os.path.join(archive_id, filename) fname = os.path.join(archive_id, filename)
@ -261,16 +261,18 @@ class BaseArchiver(object):
"""Retreive data""" """Retreive data"""
self.initialize() self.initialize()
if not scu.is_valid_filename(filename): if not scu.is_valid_filename(filename):
log('Archiver.get: invalid filename "%s"' % filename) log(f"""Archiver.get: invalid filename '{filename}'""")
raise ScoValueError("archive introuvable (déjà supprimée ?)") raise ScoValueError("archive introuvable (déjà supprimée ?)")
fname = os.path.join(archive_id, filename) fname = os.path.join(archive_id, filename)
log("reading archive file %s" % fname) log(f"reading archive file {fname}")
with open(fname, "rb") as f: with open(fname, "rb") as f:
data = f.read() data = f.read()
return data return data
def get_archived_file(self, oid, archive_name, filename): def get_archived_file(self, oid, archive_name, filename):
"""Recupere donnees du fichier indiqué et envoie au client""" """Recupère les donnees du fichier indiqué et envoie au client.
Returns: Response
"""
archive_id = self.get_id_from_name(oid, archive_name) archive_id = self.get_id_from_name(oid, archive_name)
data = self.get(archive_id, filename) data = self.get(archive_id, filename)
mime = mimetypes.guess_type(filename)[0] mime = mimetypes.guess_type(filename)[0]

View File

@ -1,7 +1,7 @@
# -*- mode: python -*- # -*- mode: python -*-
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
SCOVERSION = "9.4.20" SCOVERSION = "9.4.21"
SCONAME = "ScoDoc" SCONAME = "ScoDoc"