1882 lines
63 KiB
Python
Executable File
1882 lines
63 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Standard libs
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import os
|
|
import pdb # used for debugging
|
|
import inspect # used for debugging
|
|
import random
|
|
import sys
|
|
import re
|
|
|
|
# Third-party libs
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
import drawsvg
|
|
|
|
try:
|
|
from dotenv import load_dotenv
|
|
except ModuleNotFoundError:
|
|
print("\nError: dotenv not installed !", file=sys.stderr)
|
|
print("You may install it using:\npip install python-dotenv\n", file=sys.stderr)
|
|
|
|
|
|
def die(msg: str, status=3):
|
|
print(msg, file=sys.stderr)
|
|
sys.exit(status)
|
|
|
|
|
|
def debug(*args):
|
|
if not hasattr(debug, "counter"):
|
|
debug.counter = 1 # Initialize the counter if it doesn't exist
|
|
else:
|
|
debug.counter += 1
|
|
caller_frame = inspect.currentframe().f_back
|
|
where = str(caller_frame.f_lineno) + "@" + caller_frame.f_code.co_name
|
|
if len(args) > 1:
|
|
print(f"[DEBUG {debug.counter}:{where}] " + str(args[0]), args[1:])
|
|
elif len(args) > 0:
|
|
print(f"[DEBUG {debug.counter}:{where}] " + str(args[0]))
|
|
else:
|
|
print(f"[DEBUG {debug.counter}:{where}] (no reason given)")
|
|
|
|
|
|
def warning(*args):
|
|
if len(args) > 0:
|
|
print("[WARNING] " + str(args[0]), args[1:])
|
|
else:
|
|
print("[WARNING] (no reason given)")
|
|
|
|
|
|
def info(*args):
|
|
if len(args) > 0:
|
|
print("[INFO] " + str(args[0]), args[1:])
|
|
else:
|
|
print("[INFO] (no reason given)")
|
|
|
|
|
|
load_dotenv(".env")
|
|
|
|
SCODOC_SERVER = os.environ.get("SCODOC_SERVER") or "http://localhost:5000"
|
|
SCODOC_USER = os.environ.get("SCODOC_USER") or die(
|
|
"SCODOC_USER must be set in .env or the environment"
|
|
)
|
|
SCODOC_PASSWORD = os.environ.get("SCODOC_PASSWORD") or die(
|
|
"SCODOC_PASSWORD must be set in .env or the environment"
|
|
)
|
|
|
|
API_URL = f"{SCODOC_SERVER}/ScoDoc/api"
|
|
|
|
# TODO : refactor globals
|
|
DEBUG = True # Not used
|
|
BLOCKING = True # Die if csv is incorrect
|
|
|
|
|
|
def blockordie(reason: str = "", status: int = 2):
|
|
if reason:
|
|
print(reason)
|
|
else:
|
|
print("Blocking, no reason given")
|
|
if BLOCKING:
|
|
sys.exit(status)
|
|
|
|
|
|
class Filter:
|
|
# Filter on students to be considered
|
|
# 1 consider only technological baccalaureates, statistics are always asked
|
|
# 2 consider only women, because gender statistics are frequently asked
|
|
# 4 consider only incoming students (primo-entrants) in first year of the cohort
|
|
# 8 consider only people having a first year, not parallel entries
|
|
TECHNO = 1
|
|
WOMAN = 2
|
|
PRIMO = 4
|
|
MAIN = 8
|
|
|
|
|
|
class OptionSet:
|
|
def __init__(self, values={}):
|
|
# Initialise un dictionnaire interne pour stocker les options
|
|
if type(values) == type({}):
|
|
self._options = values
|
|
else:
|
|
self._options = {}
|
|
self._orderkey = None
|
|
self._main_filter = None
|
|
self._secondary_filter = None
|
|
self._depts = []
|
|
|
|
def __getitem__(self, key):
|
|
# Récupère la valeur correspondant à la clé
|
|
return self._options[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
# Assigne la valeur à la clé donnée
|
|
self._options[key] = value
|
|
|
|
def __delitem__(self, key):
|
|
# Supprime la clé spécifiée
|
|
if key in self._options:
|
|
del self._options[key]
|
|
|
|
def __contains__(self, key):
|
|
# Permet l'utilisation de 'in' pour vérifier l'existence d'une clé
|
|
return key in self._options
|
|
|
|
def __getattr__(self, name):
|
|
try:
|
|
return self._options[name]
|
|
except KeyError:
|
|
raise AttributeError(f"'Options' object has no attribute '{name}'")
|
|
|
|
def __setattr__(self, name, value):
|
|
if name[0] == "_":
|
|
super().__setattr__(name, value)
|
|
else:
|
|
self._options[name] = value
|
|
|
|
def __delattr__(self, name):
|
|
# Appelé quand un attribut est supprimé
|
|
if name in self._options:
|
|
del self._options[name]
|
|
else:
|
|
raise AttributeError(f"'Options' object has no attribute '{name}'")
|
|
|
|
def __repr__(self):
|
|
return f"Options({self._options})"
|
|
|
|
def asDict(self):
|
|
return self._options
|
|
|
|
def asCLI(self, excludeDefault=True, onlyOrders=False, depts=True):
|
|
cli = []
|
|
if not onlyOrders:
|
|
for opt in self.__class__.choiceoptions:
|
|
if self[opt[0]] == 0 and excludeDefault:
|
|
continue
|
|
cli.append("--" + opt[1][self[opt[0]]])
|
|
for opt in self.__class__.stringoptions:
|
|
if excludeDefault and self[opt[0]] == opt[1]:
|
|
continue
|
|
cli.append("--" + opt[0])
|
|
cli.append(self[opt[0]])
|
|
for opt in self.__class__.posint_options:
|
|
if excludeDefault and self[opt[0]] == opt[1]:
|
|
continue
|
|
cli.append("--" + opt[0])
|
|
cli.append(self[opt[0]])
|
|
for opt in self.__class__.booleanoptions:
|
|
if excludeDefault and self[opt[0]] == opt[1]:
|
|
continue
|
|
if self[opt[0]]:
|
|
cli.append("--" + opt[0])
|
|
else:
|
|
cli.append("--no-" + opt[0])
|
|
if "override" in self._options:
|
|
for FIELD, FIELDdict in self._options["override"].items():
|
|
for FIELDVALUE, FIELDVALUEdict in FIELDdict.items():
|
|
for key, val in FIELDVALUEdict.items():
|
|
cli.extend(["--override", FIELD, FIELDVALUE, key, val])
|
|
if "orders" in self._options:
|
|
orders = self._options["orders"]
|
|
cli.append("--orders")
|
|
for i, column in enumerate(orders):
|
|
if i > 0:
|
|
cli.append("/")
|
|
for row in column:
|
|
cli.append(row)
|
|
cli.append(".")
|
|
if depts:
|
|
cli.extend(self._depts)
|
|
return cli
|
|
|
|
def orderkey(self, filters=False):
|
|
if filters:
|
|
d = self._depts.copy()
|
|
d.append(str(self.filter()))
|
|
d.append(str(self.filter(main=False)))
|
|
return "_".join(d)
|
|
if self._orderkey is not None:
|
|
return self._orderkey
|
|
self._orderkey = "_".join(self._depts)
|
|
return self._orderkey
|
|
|
|
def depts(self, xset=None):
|
|
if xset:
|
|
self._depts = xset
|
|
return self._depts
|
|
|
|
def filter(self, main: bool = True):
|
|
r = 0
|
|
stem = "base"
|
|
if not main:
|
|
if self._secondary_filter is not None:
|
|
return self._secondary_filter
|
|
stem = "secondary"
|
|
else:
|
|
if self._main_filter is not None:
|
|
return self._main_filter
|
|
for suffix, f in {"techno": Filter.TECHNO, "women": Filter.WOMAN}.items():
|
|
option = f"{stem}_{suffix}"
|
|
if option in self._options and self._options[option]:
|
|
r |= f
|
|
if main:
|
|
self._main_filter = r
|
|
return r
|
|
self._secondary_filter = r | self.filter()
|
|
return self._secondary_filter
|
|
|
|
stringoptions = [
|
|
[
|
|
"department_separator",
|
|
" ",
|
|
"Separator before department in semester designation/display/origin designation",
|
|
],
|
|
[
|
|
"diplome_separator",
|
|
"",
|
|
"Separator before diploma in semester designation/display/origin designation",
|
|
],
|
|
[
|
|
"modalite_separator",
|
|
" ",
|
|
"Separator before modality in semester designation/display/origin designation",
|
|
],
|
|
[
|
|
"parcours_separator",
|
|
"/",
|
|
"Separator before parcours in semester designation/display/origin designation",
|
|
],
|
|
[
|
|
"rank_separator",
|
|
"",
|
|
"Separator before rank (~year of progress) in cursus designation/display/origin designation",
|
|
],
|
|
[
|
|
"year_separator",
|
|
" ",
|
|
"Separator before year in semester designation/display/origin designation",
|
|
],
|
|
[
|
|
"nick",
|
|
"{diplome}{rank}{multidepartment}{modalite}{parcours}{year}",
|
|
"Yearly cursus designation (should be unique for each distinguisable cursus choice)",
|
|
],
|
|
[
|
|
"displayname",
|
|
"{diplome}{rank}{multidepartment}{modaliteshort}{parcours}",
|
|
"Yearly cursus origin (used only for captionning the flow)",
|
|
],
|
|
[
|
|
"extnick",
|
|
"{ext}{rank}{multidepartment}{diplomenobut}{modaliteshort}",
|
|
"Origin designation (should be unique for each distinguisable origin of students)",
|
|
],
|
|
]
|
|
|
|
choiceoptions = [["algo", ["optimize", "reuse", "restart"]]]
|
|
|
|
booleanoptions = [
|
|
["base_techno", False, "Base population includes only techno students"],
|
|
["base_women", False, "Base population includes only women students"],
|
|
[
|
|
"secondary_techno",
|
|
True,
|
|
"Secondary (focused) population includes only techno students",
|
|
],
|
|
[
|
|
"secondary_women",
|
|
False,
|
|
"Secondary (focused) population includes only women students",
|
|
],
|
|
]
|
|
|
|
posint_options = [
|
|
["spacing", 14, 0, 30, "Spacing between groups in the same column"],
|
|
["thickness", 6, 0, 30, "Width of the group bars in columns"],
|
|
["hmargin", 20, 0, 50, "Global margin around the graph"],
|
|
["fontsize_name", 10, 0, 30, "Font size of the group name"],
|
|
["fontsize_count", 14, 0, 30, "Font size of the population marks"],
|
|
["width", 1300, 800, None, "Width of the graphics (not counting captions)"],
|
|
["statwidth", 300, 0, None, "Width of the side caption"],
|
|
["height", 0, 0, None, "Height of the graphics (0 = automaticd)"],
|
|
["loops", 300, 0, 1000, "Number of loops of the optimization algorithm"],
|
|
["baseyear", 2021, 2000, None, "Base year (start of the cohort)"],
|
|
]
|
|
|
|
shortcuts = {"baseyear": ["--base", "-b"], "loops": ["-l"]}
|
|
|
|
|
|
def range_limited_int_type(arg, MIN_VAL, MAX_VAL):
|
|
"""Type function for argparse - an integer within some predefined bounds"""
|
|
try:
|
|
f = int(arg)
|
|
except ValueError:
|
|
raise argparse.ArgumentTypeError("Must be an integer point number")
|
|
if MIN_VAL is not None and f < MIN_VAL:
|
|
raise argparse.ArgumentTypeError(
|
|
"Argument must be larger or equal to " + str(MIN_VAL)
|
|
)
|
|
if MAX_VAL is not None and f > MAX_VAL:
|
|
raise argparse.ArgumentTypeError(
|
|
"Argument must be smaller or equal to " + str(MAX_VAL)
|
|
)
|
|
return f
|
|
|
|
|
|
def format_for_shell(strings):
|
|
# Regex pour détecter les caractères spéciaux
|
|
special_chars = re.compile(r"[^+/.a-zA-Z0-9_-]")
|
|
|
|
formatted_strings = []
|
|
for ss in strings:
|
|
s = str(ss)
|
|
if special_chars.search(s): # Si la chaîne contient des caractères spéciaux
|
|
formatted_s = "'" + s.replace("'", "'\"'\"'") + "'"
|
|
elif len(s) == 0:
|
|
formatted_s = "''"
|
|
else:
|
|
formatted_s = s
|
|
formatted_strings.append(formatted_s)
|
|
|
|
# Concatène les chaînes pour qu'elles soient prêtes à copier-coller dans le shell
|
|
return " ".join(formatted_strings)
|
|
|
|
|
|
def cli_check():
|
|
"""Read args from the command line
|
|
then read config from {orderkey}.json
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
usage="""
|
|
%(prog)s [--options] DEPARTEMENTS...
|
|
|
|
OR
|
|
|
|
%(prog)s FILE.json
|
|
""",
|
|
description="Create a sankey diagram for the evolution of students through some departments.",
|
|
)
|
|
parser.add_argument(
|
|
"--orders",
|
|
nargs="+",
|
|
help="Start of orders list with subgroups separated by / ended by .",
|
|
)
|
|
|
|
parser.add_argument("depts", nargs="*", help="List of departments")
|
|
|
|
# STRING OPTIONS
|
|
for opt in OptionSet.stringoptions:
|
|
xopt = ["--" + opt[0]]
|
|
if opt[0] in OptionSet.shortcuts:
|
|
xopt.extend(OptionSet.shortcuts[opt[0]])
|
|
parser.add_argument(*xopt, type=str, default=opt[1], help=opt[2])
|
|
|
|
# POSITIVE INTEGERS OPTIONS
|
|
for opt in OptionSet.posint_options:
|
|
xopt = ["--" + opt[0]]
|
|
if opt[0] in OptionSet.shortcuts:
|
|
xopt.extend(OptionSet.shortcuts[opt[0]])
|
|
optrange = ""
|
|
if opt[3] == None and opt[2] != None:
|
|
optrange = f" larger or equal to {opt[2]}"
|
|
elif opt[2] == None and opt[3] != None:
|
|
optrange = f" smaller than {opt[3]}"
|
|
elif opt[2] != None and opt[3] != None:
|
|
optrange = f" (between {opt[2]} and {opt[3]})"
|
|
|
|
def rangefactory(y, z):
|
|
return lambda x: range_limited_int_type(x, y, z)
|
|
|
|
parser.add_argument(
|
|
*xopt,
|
|
type=rangefactory(opt[2], opt[3]),
|
|
default=opt[1],
|
|
help=opt[4] + optrange,
|
|
)
|
|
|
|
# BOOLEAN OPTIONS
|
|
for opt in OptionSet.booleanoptions:
|
|
g = parser.add_mutually_exclusive_group()
|
|
xopt = ["--" + opt[0]]
|
|
if opt[0] in OptionSet.shortcuts:
|
|
xopt.extend(OptionSet.shortcuts[opt[0]])
|
|
g.add_argument(*xopt, action="store_true", default=opt[1], help=opt[2])
|
|
xopt = ["--no-" + opt[0]]
|
|
if "no-" + opt[0] in OptionSet.shortcuts:
|
|
xopt.extend(OptionSet.shortcuts["no-" + opt[0]])
|
|
g.add_argument(
|
|
*xopt,
|
|
action="store_true",
|
|
# help=opt[2].replace("includes only", "doesn't care about"),
|
|
)
|
|
|
|
# OTHER OPTIONS
|
|
|
|
parser.add_argument(
|
|
"--override",
|
|
nargs=4,
|
|
metavar=(
|
|
"FIELD",
|
|
"FIELD_VALUE",
|
|
"REPLACEMENT_FIELD",
|
|
"REPLACEMENT_FIELD_VALUE",
|
|
),
|
|
help="Override a specific field with a fixed value in some specific semester(s) selected by FIELD=FIELD_VALUE",
|
|
)
|
|
|
|
optimize_group = parser.add_mutually_exclusive_group()
|
|
optimize_group.add_argument("--reuse", "-r", action="store_true", help="Reuse mode")
|
|
optimize_group.add_argument(
|
|
"--optimize",
|
|
"-o",
|
|
action="store_true",
|
|
help="Use algorithm to enhance graph (using last result)",
|
|
)
|
|
|
|
optimize_group.add_argument(
|
|
"--restart",
|
|
"-R",
|
|
action="store_true",
|
|
help="Use algorithm to enhance graph (starting from random)",
|
|
)
|
|
if len(sys.argv) > 1 and sys.argv[1].endswith(".json"):
|
|
try:
|
|
json_file = sys.argv[1]
|
|
with open(json_file, "r") as f:
|
|
fakeclisource = json.load(f)
|
|
fakecli = [str(x) for x in fakeclisource]
|
|
except FileNotFoundError:
|
|
die(f"Error: File '{json_file}' not found.", 1)
|
|
except json.JSONDecodeError:
|
|
die(f"Error: File '{json_file}' is not valid JSON.", 1)
|
|
args = parser.parse_args(args=fakecli)
|
|
else:
|
|
args = parser.parse_args()
|
|
if len(args.depts) == 0 and (
|
|
args.orders is None or args.orders[-1] == "." or "." not in args.orders
|
|
):
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
return args
|
|
|
|
|
|
def options_from_args(args):
|
|
Options = OptionSet()
|
|
# Gestion de --orders pour construire le tableau 2D
|
|
orders = []
|
|
if args.depts:
|
|
depts = args.depts
|
|
else:
|
|
depts = []
|
|
if args.orders:
|
|
current_order = []
|
|
l = args.orders.copy()
|
|
idx = 0
|
|
while len(l) > idx:
|
|
item = l[idx]
|
|
idx += 1
|
|
if item == "/":
|
|
# Nouvelle ligne à chaque "--next"
|
|
orders.append(current_order)
|
|
current_order = []
|
|
elif item == ".":
|
|
# Fin de la liste d'ordres
|
|
orders.append(current_order)
|
|
break
|
|
else:
|
|
# Ajouter l'élément au sous-groupe en cours
|
|
current_order.append(item)
|
|
depts.extend(l[idx:])
|
|
Options["orders"] = orders
|
|
dargs = vars(args)
|
|
for opt in OptionSet.posint_options:
|
|
if opt[0] in dargs:
|
|
Options[opt[0]] = dargs[opt[0]]
|
|
for opt in OptionSet.stringoptions:
|
|
if opt[0] in dargs:
|
|
Options[opt[0]] = dargs[opt[0]]
|
|
for opt in OptionSet.booleanoptions:
|
|
if opt[0] in dargs:
|
|
if "no_" + opt[0] in dargs and dargs["no_" + opt[0]]:
|
|
Options[opt[0]] = not dargs["no_" + opt[0]]
|
|
else:
|
|
Options[opt[0]] = dargs[opt[0]]
|
|
if not (args.reuse or args.restart or args.optimize):
|
|
Options.algo = 0
|
|
else:
|
|
Options.algo = 0 if args.optimize else (1 if args.reuse else 2)
|
|
Options.depts(depts)
|
|
return Options
|
|
|
|
|
|
def merge_options(Options, jsondict):
|
|
if "override" in jsondict:
|
|
Options["override"] = jsondict["override"]
|
|
if "orders" in jsondict:
|
|
Options["orders"] = jsondict["orders"]
|
|
|
|
|
|
def api_url(dept: str | None = None):
|
|
"""L'URL de l'API départementale."""
|
|
# peut être modifié ici pour n'utiliser que l'API globale
|
|
return (
|
|
f"{SCODOC_SERVER}/ScoDoc/{dept}/api" if dept else f"{SCODOC_SERVER}/ScoDoc/api"
|
|
)
|
|
|
|
|
|
def read_conf(key):
|
|
if os.path.exists(f"{key}.json"):
|
|
with open(f"{key}.json", "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def write_conf(key, obj):
|
|
with open(f"{key}.json", "w") as f:
|
|
return json.dump(obj, f)
|
|
return {}
|
|
|
|
|
|
Options = options_from_args(cli_check())
|
|
orderkey = Options.orderkey()
|
|
depts = Options.depts()
|
|
|
|
defaults = {}
|
|
|
|
|
|
student = {}
|
|
CACHE_FILE = "cache.json"
|
|
|
|
|
|
def load_cache(cache_file):
|
|
if os.path.exists(cache_file):
|
|
with open(cache_file, "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def save_cache(cache, file=None):
|
|
if file == None:
|
|
file = CACHE_FILE
|
|
with open(CACHE_FILE, "w") as f:
|
|
json.dump(cache, f)
|
|
|
|
|
|
cache = load_cache(CACHE_FILE)
|
|
|
|
|
|
# Read color theme
|
|
# There are default color values, so may be it should just join the conf.json file
|
|
def read_theme():
|
|
if os.path.exists("theme.csv"):
|
|
with open("theme.csv", newline="") as csvfile:
|
|
csvreader = csv.reader(csvfile, delimiter=",", quotechar='"')
|
|
for row in csvreader:
|
|
if len(row) == 0:
|
|
continue
|
|
elif len(row[0]) == 0:
|
|
blockordie("Wrong line in theme : " + str(row))
|
|
elif row[0][0] == "#":
|
|
continue
|
|
else:
|
|
colors[row[0]] = row[1]
|
|
|
|
|
|
colors = {
|
|
"+DUT": "#0040C0",
|
|
"QUIT": "#00FF00",
|
|
"SUCCESS": "#0000FF",
|
|
"NORMAL": "#C0C0C0",
|
|
"FAIL": "#FF4040",
|
|
"OLD": "#FF8000",
|
|
"NEW": "#FFFF00",
|
|
"TRANSPARENT": "#FFFFFF.0",
|
|
"RED": "#000000",
|
|
}
|
|
read_theme()
|
|
|
|
# Read redirects
|
|
# Only one file, since various combinations including same departments should
|
|
# use the same redirections ("no jury yet but almost sure it will be ...")
|
|
|
|
|
|
def read_redirects():
|
|
if os.path.exists("redirect.csv"):
|
|
with open("redirect.csv", newline="") as csvfile:
|
|
csvreader = csv.reader(csvfile, delimiter=",", quotechar='"')
|
|
for row in csvreader:
|
|
if len(row) == 0:
|
|
continue
|
|
elif len(row[0]) == 0:
|
|
blockordie("Wrong line in redirect : " + str(row))
|
|
elif row[0][0] == "#":
|
|
continue
|
|
else:
|
|
redirects[int(row[0])] = row[1]
|
|
|
|
|
|
redirects = {}
|
|
read_redirects()
|
|
|
|
|
|
# Gestion globale d'un jeton pour l'API
|
|
token = None
|
|
|
|
|
|
def get_json(url: str, params=None):
|
|
debug(f"Requesting {url}")
|
|
global token
|
|
if token == None:
|
|
url_token = f"{API_URL}/tokens"
|
|
response = requests.post(
|
|
url_token, auth=HTTPBasicAuth(SCODOC_USER, SCODOC_PASSWORD)
|
|
)
|
|
if response.status_code == 200:
|
|
token = response.json().get("token")
|
|
else:
|
|
blockordie(
|
|
f"Erreur de récupération de token: {response.status_code} - {response.text}",
|
|
status=1,
|
|
)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
response = requests.get(url, headers=headers, params=params)
|
|
if response.status_code == 200:
|
|
# Afficher la réponse JSON
|
|
return response.json()
|
|
else:
|
|
blockordie(
|
|
f"Erreur avec {url}: {response.status_code} - {response.text}", status=1
|
|
)
|
|
|
|
|
|
formsem_dept = {}
|
|
formsem_department = {}
|
|
|
|
|
|
def get_formsem_from_dept(dept):
|
|
if "formsems" in cache and dept in cache["formsems"]:
|
|
return cache["formsems"][dept]
|
|
if "formsems" not in cache:
|
|
cache["formsems"] = {}
|
|
if "sem" not in cache:
|
|
cache["sem"] = {}
|
|
query_url = f"{api_url(dept)}/formsemestres/query"
|
|
formsemestres = get_json(query_url)
|
|
result = []
|
|
for sem in formsemestres:
|
|
semid = str(sem["formsemestre_id"])
|
|
formsem_dept[semid] = dept
|
|
cache["sem"][semid] = sem
|
|
result.append(semid)
|
|
cache["formsems"][dept] = result
|
|
save_cache(cache)
|
|
return result
|
|
|
|
|
|
def get_formations_from_dept(dept):
|
|
if "formations" in cache and dept in cache["formations"]:
|
|
return cache["formations"][dept]
|
|
if "formations" not in cache:
|
|
cache["formations"] = {}
|
|
query_url = f"{api_url(dept)}/formations"
|
|
formations = get_json(query_url)
|
|
result = []
|
|
for f in formations:
|
|
if f["type_parcours"] == 700:
|
|
result.append(f["formation_id"])
|
|
cache["formations"][dept] = result
|
|
save_cache(cache)
|
|
return result
|
|
|
|
|
|
def get_etuds_from_formsem(dept, semid):
|
|
if type(semid) == type(0):
|
|
semid = str(semid)
|
|
if "etudlist" in cache and semid in cache["etudlist"]:
|
|
return cache["etudlist"][semid]
|
|
if "etudlist" not in cache:
|
|
cache["etudlist"] = {}
|
|
query_url = f"{api_url(dept)}/formsemestre/{semid}/etudiants/long"
|
|
result = get_json(query_url)
|
|
cache["etudlist"][semid] = result
|
|
save_cache(cache)
|
|
return result
|
|
|
|
|
|
def get_jury_from_formsem(dept: str, semid):
|
|
if type(semid) == type(0):
|
|
semid = str(semid)
|
|
if "semjury" in cache and semid in cache["semjury"]:
|
|
return cache["semjury"][semid]
|
|
if "semjury" not in cache:
|
|
cache["semjury"] = {}
|
|
|
|
# query_url = f"{server}{dept}/Scolarite/Notes/formsemestre_recapcomplet?formsemestre_id={semid}&mode_jury=1&tabformat=json"
|
|
query_url = f"{api_url(dept)}/formsemestre/{semid}/decisions_jury"
|
|
result = get_json(query_url)
|
|
cache["semjury"][semid] = result
|
|
save_cache(cache)
|
|
return result
|
|
|
|
|
|
def get_override(sem, xkey, default=None):
|
|
if "overrides" not in Options:
|
|
return default
|
|
overrides = Options.override
|
|
for j in ["titre_num", "titre", "session_id"]:
|
|
if (
|
|
j in sem
|
|
and j in overrides
|
|
and sem[j] in overrides[j]
|
|
and xkey in overrides[j][sem[j]]
|
|
):
|
|
return overrides[j][sem[j]][xkey]
|
|
return default
|
|
|
|
|
|
def nick_replace(
|
|
department, diplome, rank, modalite, parcours, nick, year=Options.baseyear
|
|
):
|
|
if type(rank) != int:
|
|
rank = 0
|
|
if len(department) > 0:
|
|
nick = nick.replace("{department}", Options.department_separator + department)
|
|
else:
|
|
nick = nick.replace("{department}", "")
|
|
if len(department) > 0 and len(depts) > 1:
|
|
nick = nick.replace(
|
|
"{multidepartment}", Options.department_separator + department
|
|
)
|
|
else:
|
|
nick = nick.replace("{multidepartment}", "")
|
|
if len(diplome) > 0:
|
|
nick = nick.replace("{diplome}", Options.diplome_separator + diplome)
|
|
else:
|
|
nick = nick.replace("{diplome}", "")
|
|
if len(diplome) > 0 and diplome != "BUT":
|
|
nick = nick.replace("{diplomenobut}", Options.diplome_separator + diplome)
|
|
else:
|
|
nick = nick.replace("{diplomenobut}", "")
|
|
if rank > 0:
|
|
nick = nick.replace("{rank}", Options.rank_separator + str(rank))
|
|
else:
|
|
nick = nick.replace("{rank}", "")
|
|
nick = nick.replace(
|
|
"{year}", Options.year_separator + str(Options.baseyear + rank - 1)
|
|
)
|
|
if diplome != "BUT":
|
|
nick = nick.replace(
|
|
"{yearnobut}",
|
|
Options.year_separator + str(Options.baseyear + rank - 1),
|
|
)
|
|
else:
|
|
nick = nick.replace("{yearnobut}", "")
|
|
if len(modalite) > 0:
|
|
nick = nick.replace("{modalite}", Options.modalite_separator + modalite)
|
|
else:
|
|
nick = nick.replace("{modalite}", "")
|
|
if len(modalite) > 0 and modalite != "FI":
|
|
nick = nick.replace("{modaliteshort}", modalite[-1])
|
|
else:
|
|
nick = nick.replace("{modaliteshort}", "")
|
|
if len(parcours) > 0:
|
|
nick = nick.replace("{parcours}", Options.parcours_separator + parcours)
|
|
else:
|
|
nick = nick.replace("{parcours}", "")
|
|
extname = "Ecand "
|
|
if diplome == "BUT":
|
|
extname = "EXT"
|
|
nick = nick.replace("{ext}", extname)
|
|
return nick
|
|
|
|
|
|
def analyse_student(semobj, etud, univ_year=None):
|
|
"""Returns the final (department,diplome,rank,modalite,parcours,nickname,displayname) tuple from etudid in semid, taking into accounts overrides."""
|
|
session_id = semobj["session_id"].split("-")
|
|
year = str(semobj["annee_scolaire"])
|
|
department = session_id[0]
|
|
diplome = session_id[1]
|
|
modalite = session_id[2]
|
|
if univ_year == None:
|
|
if semobj["semestre_id"] < 0:
|
|
rank = 1
|
|
else:
|
|
rank = (semobj["semestre_id"] + 1) // 2
|
|
else:
|
|
rank = univ_year
|
|
parcours = None
|
|
groups = []
|
|
if "groups" in etud:
|
|
for x in etud["groups"]:
|
|
if x["partition_name"] == "Parcours":
|
|
parcours = x["group_name"]
|
|
groups.append(x["group_name"])
|
|
if parcours == None:
|
|
parcours = ""
|
|
parcours = get_override(semobj, "parcours", parcours)
|
|
department = get_override(semobj, "department", department)
|
|
rank = get_override(semobj, "rank", rank)
|
|
diplome = get_override(semobj, "diplome", diplome)
|
|
modalite = get_override(semobj, "modalite", modalite)
|
|
formsem_department[str(semobj["id"])] = department
|
|
if len(modalite) > 0 and modalite[0] == "G":
|
|
goal = modalite.split(":")[1:]
|
|
modalite = None
|
|
for g in goal:
|
|
gg = g.split("=")
|
|
if gg[0] in groups:
|
|
modalite = gg[1]
|
|
nick = Options.nick
|
|
nick = nick_replace(department, diplome, rank, modalite, parcours, nick, year)
|
|
displayname = Options.displayname
|
|
displayname = nick_replace(
|
|
department, diplome, rank, modalite, parcours, displayname, year
|
|
)
|
|
return department, diplome, rank, modalite, parcours, nick, displayname
|
|
|
|
|
|
def get_nick(semobj, etud):
|
|
department, diplome, rank, modalite, parcours, nick, displayname = analyse_student(
|
|
semobj, etud
|
|
)
|
|
return nick, displayname
|
|
|
|
|
|
def get_dept_from_sem(semid):
|
|
return formsem_department[str(semid)]
|
|
|
|
|
|
oldsems = set()
|
|
oldsemsdept = {}
|
|
futuresems = set()
|
|
futuresemsdept = {}
|
|
bacs = set()
|
|
cohort_nip = set()
|
|
|
|
|
|
def analyse_depts():
|
|
for dept in depts:
|
|
formsems = get_formsem_from_dept(dept)
|
|
for semid in formsems:
|
|
# Check if this is a part of the cohort
|
|
# or a future/old semester
|
|
sem = cache["sem"][str(semid)]
|
|
if sem["semestre_id"] < 0:
|
|
year = 1
|
|
else:
|
|
year = (sem["semestre_id"] + 1) // 2
|
|
offset = sem["annee_scolaire"] - Options.baseyear - year + 1
|
|
if offset < 0 and offset > -4:
|
|
oldsems.add(str(semid))
|
|
oldsemsdept[semid] = dept
|
|
if offset > 0 and offset < 4:
|
|
futuresems.add(str(semid))
|
|
futuresemsdept[semid] = dept
|
|
if offset != 0:
|
|
continue
|
|
if sem["formation"]["type_parcours"] != 700:
|
|
continue
|
|
if sem["modalite"] == "EXT":
|
|
continue
|
|
# This is a BUT semester, part of the cohort
|
|
# 0,1 : preceding year ; 2-7 : cohort ; 8+ : future
|
|
if sem["semestre_id"] < 0:
|
|
bucket = 1
|
|
else:
|
|
bucket = str(int(sem["semestre_id"] - 1))
|
|
# Ici, le semestre est donc un semestre intéressant
|
|
# On prélève tous les étudiants, et on remplit leur cursus
|
|
etuds = get_etuds_from_formsem(dept, semid)
|
|
jurys = get_jury_from_formsem(dept, semid)
|
|
key = sem["titre_num"]
|
|
for etud in etuds:
|
|
etudid = etud["id"]
|
|
if etudid in student:
|
|
studentsummary = student[etudid]
|
|
else:
|
|
studentsummary = {}
|
|
studentsummary["cursus"] = {} # Cursus is semid
|
|
studentsummary["etudid"] = {} # useful when merging students
|
|
studentsummary["pseudodept"] = {} # pseudo-dept for interdept
|
|
studentsummary["diplome"] = {} # diplome name
|
|
studentsummary["rank"] = {} # rank
|
|
studentsummary["modalite"] = {} # modalite
|
|
studentsummary["parcours"] = {} # parcours
|
|
studentsummary["nickname"] = {} # nick
|
|
studentsummary["displayname"] = {} # display name
|
|
studentsummary["dept"] = dept # useful when merging students
|
|
studentsummary["bac"] = "" # usually
|
|
department, diplome, rank, modalite, parcours, nick, displayname = (
|
|
analyse_student(sem, etud, year)
|
|
)
|
|
if "bac" in etud["admission"]:
|
|
studentsummary["bac"] = etud["admission"]["bac"]
|
|
else:
|
|
studentsummary["bac"] = "INCONNU"
|
|
if "civilite" in etud:
|
|
studentsummary["civilite"] = etud["civilite"]
|
|
else:
|
|
studentsummary["civilite"] = "?"
|
|
bacs.add(studentsummary["bac"])
|
|
if bucket in studentsummary["cursus"]:
|
|
semestreerreur = int(bucket) + 1
|
|
warning(
|
|
f"// Élève {etudid} dans deux semestres à la fois : S{semestreerreur}, semestres {studentsummary['cursus'][bucket]} et {semid}"
|
|
)
|
|
if "dept" in studentsummary and studentsummary["dept"] != dept:
|
|
warning(
|
|
f"// Élève ayant changé de département {dept},{studentsummary['dept']}"
|
|
)
|
|
# department, diplome, rank, modalite, parcours, nick = analyse_student(
|
|
studentsummary["cursus"][bucket] = semid
|
|
studentsummary["etudid"][bucket] = etudid
|
|
studentsummary["pseudodept"][bucket] = department
|
|
studentsummary["diplome"][bucket] = diplome
|
|
studentsummary["rank"][bucket] = rank
|
|
studentsummary["modalite"][bucket] = modalite
|
|
studentsummary["parcours"][bucket] = parcours
|
|
studentsummary["nickname"][bucket] = nick
|
|
studentsummary["displayname"][bucket] = displayname
|
|
studentsummary["debug"] = etud["sort_key"] # TODO: REMOVE
|
|
studentsummary["unid"] = etud["code_nip"]
|
|
cohort_nip.add(etud["code_nip"])
|
|
student[etudid] = studentsummary
|
|
|
|
|
|
analyse_depts()
|
|
|
|
|
|
def allseeingodin():
|
|
"""This function changes the student lists by peeking in the past and the future to know which students come from another cohort or go into a later cohort."""
|
|
displaynames = {}
|
|
oldstudents = {}
|
|
oldstudentslevel = {}
|
|
futurestudents = {}
|
|
futurestudentslevel = {}
|
|
|
|
# We look for the latest "old semester" in which every (old) student went
|
|
for semid in oldsems:
|
|
sem = cache["sem"][semid]
|
|
semlevel = sem["semestre_id"]
|
|
# For a while, some people registered former semesters (in other places) with "EXT" modalite for a fake semester
|
|
if sem["modalite"] == "EXT": # Ignore EXT modalite
|
|
continue
|
|
semlevel = abs(semlevel)
|
|
dept = oldsemsdept[semid]
|
|
etuds = get_etuds_from_formsem(dept, semid)
|
|
for etud in etuds:
|
|
nip = etud["code_nip"]
|
|
if nip not in cohort_nip:
|
|
continue
|
|
if nip not in oldstudentslevel or semlevel > oldstudentslevel[nip]:
|
|
oldstudentslevel[nip] = semlevel
|
|
nick_t, disp_t = get_nick(sem, etud)
|
|
oldstudents[nip] = [semid, nick_t]
|
|
displaynames[nick_t] = disp_t
|
|
for semid in futuresems:
|
|
sem = cache["sem"][semid]
|
|
if sem["formation"]["type_parcours"] != 700:
|
|
# We are only interested in BUT continuations (for now)
|
|
continue
|
|
semlevel = sem["semestre_id"]
|
|
semlevel = abs(semlevel)
|
|
dept = futuresemsdept[semid]
|
|
etuds = get_etuds_from_formsem(dept, semid)
|
|
for etud in etuds:
|
|
nip = etud["code_nip"]
|
|
if nip not in cohort_nip:
|
|
continue
|
|
if nip not in futurestudentslevel or semlevel > futurestudentslevel[nip]:
|
|
futurestudentslevel[nip] = semlevel
|
|
futurestudents[nip], tmp = get_nick(sem, etud)
|
|
|
|
unification = {}
|
|
|
|
duplicates = {}
|
|
|
|
for etudid in student.keys():
|
|
unid = student[etudid]["unid"]
|
|
if unid in unification:
|
|
if unid not in duplicates:
|
|
duplicates[unid] = [unification[unid]]
|
|
duplicates[unid].append(etudid)
|
|
unification[unid] = etudid
|
|
if unid in oldstudents:
|
|
student[etudid]["old"] = oldstudents[unid][1]
|
|
student[etudid]["oldsem"] = oldstudents[unid][0]
|
|
if unid in futurestudents:
|
|
student[etudid]["future"] = futurestudents[unid]
|
|
for unid in duplicates:
|
|
lastsem = -1
|
|
best = []
|
|
for suppidx in duplicates[unid][1:]:
|
|
supp = student[suppidx]
|
|
if str(lastsem) in supp["cursus"]:
|
|
best.append(suppidx)
|
|
for sem in range(5, lastsem, -1):
|
|
if str(sem) in supp["cursus"]:
|
|
lastsem = sem
|
|
best = [suppidx]
|
|
break
|
|
if len(best) > 1:
|
|
print(f"// Error: cannot chose last semester for NIP {unid}: ")
|
|
print(repr(best))
|
|
for x in best:
|
|
print(cache["sem"][str(x)])
|
|
sys.exit(6)
|
|
bestid = best[0]
|
|
base = student[bestid]
|
|
for suppidx in duplicates[unid]:
|
|
if suppidx == bestid:
|
|
continue
|
|
supp = student[suppidx]
|
|
for skey in (
|
|
"cursus",
|
|
"etudid",
|
|
"pseudodept",
|
|
"diplome",
|
|
"rank",
|
|
"modalite",
|
|
"parcours",
|
|
"nickname",
|
|
"displayname",
|
|
"old",
|
|
"oldsem",
|
|
):
|
|
if skey in supp:
|
|
for bucket in supp[skey]:
|
|
if bucket not in base[skey]:
|
|
base[skey][bucket] = supp[skey][bucket]
|
|
del student[suppidx]
|
|
foundfirst = False
|
|
# Ensure all cursus are continuous
|
|
for etudid in student:
|
|
etud = student[etudid]
|
|
foundfirst = False
|
|
foundlast = False
|
|
fillblanks = None
|
|
there = -1
|
|
for i in range(6):
|
|
if str(i) in etud["cursus"]:
|
|
if foundfirst and foundlast:
|
|
fillblanks = [there, i]
|
|
else:
|
|
foundfirst = True
|
|
here = i
|
|
else:
|
|
if not foundfirst:
|
|
continue
|
|
foundlast = True
|
|
there = i
|
|
if fillblanks is not None:
|
|
for i in range(fillblanks[0] - 1, fillblanks[1]):
|
|
bucket = str(i)
|
|
if bucket not in etud["cursus"]:
|
|
etud["etudid"][bucket] = etudid
|
|
etud["cursus"][bucket] = -1
|
|
etud["pseudodept"][bucket] = "OUT"
|
|
etud["diplome"][bucket] = "OUT"
|
|
etud["rank"][bucket] = (i // 2) + 1
|
|
etud["modalite"][bucket] = "FI"
|
|
etud["parcours"][bucket] = ""
|
|
etud["nickname"][bucket] = "OUT" + str(etud["rank"][bucket])
|
|
etud["displayname"][bucket] = "Césure"
|
|
displaynames[etud["nickname"][bucket]] = etud["displayname"][bucket]
|
|
return displaynames
|
|
|
|
|
|
displaynames = allseeingodin()
|
|
|
|
strange_cases = []
|
|
next = {}
|
|
nextnick = {}
|
|
|
|
|
|
def prepare_display(displaynames):
|
|
for etudid in student:
|
|
for semlevel in range(5):
|
|
if str(semlevel) in student[etudid]["nickname"]:
|
|
a = student[etudid]["nickname"][str(semlevel)]
|
|
b = student[etudid]["displayname"][str(semlevel)]
|
|
if a in displaynames:
|
|
if b != displaynames[a]:
|
|
die("{a} will be displayed as {b} or {displaynames[a]} !", 6)
|
|
else:
|
|
displaynames[a] = b
|
|
return displaynames
|
|
|
|
|
|
displaynames = prepare_display(displaynames)
|
|
|
|
|
|
for etudid in student.keys():
|
|
etud = student[etudid]
|
|
cursus_array = [None] * 6
|
|
nickname_array = [None] * 6
|
|
etudid_array = [None] * 6
|
|
for i in range(6):
|
|
if str(i) in etud["cursus"]:
|
|
cursus_array[i] = etud["cursus"][str(i)]
|
|
nickname_array[i] = etud["nickname"][str(i)]
|
|
etudid_array[i] = etud["etudid"][str(i)]
|
|
# On va réduire aux semestres pairs, on cherche donc la continuation la plus habituelle pour
|
|
# les élèves qui s'arrêtent sur un semestre impair
|
|
for i in range(0, 5, 2):
|
|
currs = str(cursus_array[i])
|
|
nexts = str(cursus_array[i + 1])
|
|
currn = str(nickname_array[i])
|
|
nextn = str(nickname_array[i + 1])
|
|
if nexts is not None:
|
|
if currs not in next:
|
|
next[currs] = {}
|
|
if nexts not in next[currs]:
|
|
next[currs][nexts] = 1
|
|
else:
|
|
next[currs][nexts] += 1
|
|
if nextn is not None:
|
|
if currn not in nextnick:
|
|
nextnick[currn] = {}
|
|
if nextn not in nextnick[currn]:
|
|
nextnick[currn][nextn] = 1
|
|
else:
|
|
nextnick[currn][nextn] += 1
|
|
etud["cursus_array"] = cursus_array
|
|
etud["nickname_array"] = nickname_array
|
|
etud["etudid_array"] = etudid_array
|
|
|
|
nextbest = {}
|
|
nextnickbest = {}
|
|
for key in next:
|
|
imax = 0
|
|
best = None
|
|
for key2 in next[key]:
|
|
if next[key][key2] > imax:
|
|
imax = next[key][key2]
|
|
best = key2
|
|
nextbest[key] = best
|
|
for key in nextnick:
|
|
imax = 0
|
|
best = None
|
|
for key2 in nextnick[key]:
|
|
if nextnick[key][key2] > imax:
|
|
imax = nextnick[key][key2]
|
|
best = key2
|
|
nextnickbest[key] = best
|
|
|
|
evennicknames = {}
|
|
for etudid in student.keys():
|
|
etud = student[etudid]
|
|
for i in range(1, 6, 2):
|
|
if etud["nickname_array"][i] not in evennicknames:
|
|
evennicknames[etud["nickname_array"][i]] = 1
|
|
else:
|
|
evennicknames[etud["nickname_array"][i]] += 1
|
|
|
|
|
|
for etudid in student.keys():
|
|
etud = student[etudid]
|
|
cursus_short = [None] * 5
|
|
nickname_short = [None] * 5
|
|
etudid_short = [None] * 5
|
|
semend = None
|
|
semstart = None
|
|
for year in range(1, 4):
|
|
sem1 = 2 * year - 2
|
|
sem2 = 2 * year - 1
|
|
finalsem = etud["cursus_array"][sem2]
|
|
nick = etud["nickname_array"][sem2]
|
|
etid = etud["etudid_array"][sem2]
|
|
if finalsem == None:
|
|
finalsem = etud["cursus_array"][sem1]
|
|
nick = etud["nickname_array"][sem1]
|
|
etid = etud["etudid_array"][sem1]
|
|
if finalsem != None:
|
|
# Abandon au premier semestre de cette année
|
|
# print(f"Pour {etudid}, année {year}, abandon au S1")
|
|
if nick not in evennicknames:
|
|
# print( f"Pour {etudid}, année {year}, changement {nick} en {nextnickbest[nick]}" )
|
|
nick = nextnickbest[nick]
|
|
if finalsem != None:
|
|
cursus_short[year] = finalsem
|
|
nickname_short[year] = nick
|
|
etudid_short[year] = etid
|
|
if etud["cursus_array"][sem1] == None:
|
|
# print(f"Pour {etudid}, année {year}, saute-mouton du S1")
|
|
pass
|
|
etud["short"] = cursus_short
|
|
etud["nickshort"] = nickname_short
|
|
etud["etudidshort"] = etudid_short
|
|
|
|
|
|
for etudid in student.keys():
|
|
etud = student[etudid]
|
|
lastyear = 4
|
|
lastsem = None
|
|
while lastsem == None:
|
|
lastyear -= 1
|
|
lastsem = etud["short"][lastyear]
|
|
ddd = get_dept_from_sem(lastsem)
|
|
if ddd not in depts:
|
|
depts.append(ddd)
|
|
|
|
badred = {}
|
|
goodred = {}
|
|
failure = {}
|
|
diploma = {}
|
|
reor2 = {}
|
|
reor1 = {}
|
|
unknown = {}
|
|
entries = {}
|
|
redirs = {}
|
|
|
|
finals = {
|
|
"FAIL": "✘",
|
|
"RED": "↩",
|
|
"QUIT": "↴",
|
|
"+DUT": "➡",
|
|
"DIPLOME": "✔",
|
|
"?": "?",
|
|
}
|
|
|
|
for d in depts:
|
|
badred[d] = 0
|
|
goodred[d] = 0
|
|
failure[d] = 0
|
|
diploma[d] = 0
|
|
reor2[d] = 0
|
|
reor1[d] = 0
|
|
unknown[d] = 0
|
|
entries[d] = 0
|
|
redirs[d] = 0
|
|
|
|
strangecases = []
|
|
for etudid in student.keys():
|
|
etud = student[etudid]
|
|
lastyear = 4
|
|
lastsem = None
|
|
while lastsem == None:
|
|
lastyear -= 1
|
|
lastsem = etud["short"][lastyear]
|
|
ddd = get_dept_from_sem(lastsem)
|
|
jury = get_jury_from_formsem(None, lastsem)
|
|
etudid_real = etud["etudidshort"][lastyear]
|
|
if etudid_real != etudid:
|
|
print(f"// Warning {etudid} {etudid_real}")
|
|
resjury = None
|
|
for x in jury:
|
|
if x["etudid"] == etudid_real:
|
|
resjury = x
|
|
break
|
|
if resjury == None:
|
|
print(f"// No jury for {etudid} year {lastyear}")
|
|
continue
|
|
resultyear = None
|
|
if resjury["etat"] == "D":
|
|
resultyear = "DEM"
|
|
if resjury["etat"] == "DEF":
|
|
resultyear = "DEF"
|
|
if (
|
|
"annee" in resjury
|
|
and "code" in resjury["annee"]
|
|
and resjury["annee"]["code"] is not None
|
|
):
|
|
resultyear = resjury["annee"]["code"]
|
|
finaloutput = None
|
|
checkred = False
|
|
if etudid in redirects:
|
|
resultyear = redirects[etudid]
|
|
redirs[ddd] += 1
|
|
strangecases.append(
|
|
f"REDI{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}"
|
|
)
|
|
if resultyear == None:
|
|
finaloutput = "?"
|
|
unknown[ddd] += 1
|
|
strangecases.append(
|
|
f"????{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}"
|
|
)
|
|
elif resultyear in ("RAT", "ATJ"):
|
|
finaloutput = "?"
|
|
unknown[ddd] += 1
|
|
strangecases.append(
|
|
f"ATTE{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}"
|
|
)
|
|
elif resultyear in ("RED", "ABL", "ADSUP"):
|
|
finaloutput = "RED"
|
|
checkred = True
|
|
elif lastyear == 3 and resultyear in ("ADM", "ADJ"):
|
|
finaloutput = "DIPLOME"
|
|
diploma[ddd] += 1
|
|
elif lastyear == 2 and resultyear in ("ADM", "ADJ"):
|
|
finaloutput = "+DUT"
|
|
reor2[ddd] += 1
|
|
elif resultyear in ("PAS1NCI", "PASD"):
|
|
finaloutput = "QUIT"
|
|
reor1[ddd] += 1
|
|
elif lastyear < 2 and resultyear in ("ADM", "ADJ"):
|
|
finaloutput = "QUIT"
|
|
reor1[ddd] += 1
|
|
elif resultyear in ("NAR", "DEM", "DEF", "ABAN"):
|
|
finaloutput = "FAIL"
|
|
failure[ddd] += 1
|
|
elif resjury["annee"]["annee_scolaire"] != Options.baseyear + lastyear - 1:
|
|
finaloutput = "RED"
|
|
checkred = True
|
|
if checkred:
|
|
if "future" not in etud:
|
|
# print(f"// Mauvais redoublement : {etudid}")
|
|
badred[ddd] += 1
|
|
finaloutput = "FAIL"
|
|
else:
|
|
goodred[ddd] += 1
|
|
output = finaloutput + " " + etud["nickshort"][lastyear]
|
|
etud["nickshort"][lastyear + 1] = output
|
|
displaynames[output] = (
|
|
finals[finaloutput] + displaynames[etud["nickshort"][lastyear]]
|
|
)
|
|
(firstsem, firstyear) = (
|
|
(etud["short"][1], 1)
|
|
if etud["short"][1] != None
|
|
else (
|
|
(etud["short"][2], 2) if etud["short"][2] != None else (etud["short"][3], 3)
|
|
)
|
|
)
|
|
firstdept = cache["sem"][firstsem]["departement"]["acronym"]
|
|
if "old" in etud:
|
|
yearold = cache["sem"][etud["oldsem"]]["annee_scolaire"]
|
|
etud["nickshort"][firstyear - 1] = etud["old"]
|
|
# yy = yearold
|
|
# delta = firstyear + Options.baseyear - yy - 2
|
|
# for i in range(delta, firstyear - 1):
|
|
# etud["nickshort"][i] = etud["nickshort"][firstyear - 1] + "*" * (
|
|
# firstyear - 1 - i
|
|
# )
|
|
else:
|
|
if (
|
|
str(firstyear * 2 - 2) in etud["cursus"]
|
|
and etud["cursus"][str(firstyear * 2 - 2)] is not None
|
|
):
|
|
startsem = str(firstyear * 2 - 2)
|
|
else:
|
|
startsem = str(firstyear * 2 - 1)
|
|
department = etud["pseudodept"][startsem]
|
|
diplome = etud["diplome"][startsem]
|
|
rank = etud["rank"][startsem]
|
|
modalite = etud["modalite"][startsem]
|
|
parcours = etud["parcours"][startsem]
|
|
nick = "EXT" + Options.nick
|
|
nick = nick_replace(department, diplome, rank, modalite, parcours, nick)
|
|
displayname = Options.extnick
|
|
displayname = nick_replace(
|
|
department, diplome, rank, modalite, parcours, displayname
|
|
)
|
|
displaynames[nick] = displayname
|
|
etud["nickshort"][firstyear - 1] = nick
|
|
# to get a better ordering in sankeymatic, false nodes ere required
|
|
# This is no more the case when building our own graphics
|
|
# for i in range(0, firstyear - 1):
|
|
# etud["nickshort"][i] = nick + "*" * (firstyear - 1 - i)
|
|
entries[ddd] += 1
|
|
|
|
|
|
def bags_from_students(student, filter=0):
|
|
bags = []
|
|
for etudid in student.keys():
|
|
# Filter
|
|
# 0 No filter
|
|
# 1 Keep only technological baccalaureate
|
|
# 2 Keep only women
|
|
# ... to be completed
|
|
if filter & Filter.TECHNO:
|
|
if student[etudid]["bac"][:2] != "ST":
|
|
continue
|
|
parc = student[etudid]["nickshort"]
|
|
for i in range(len(parc) - 1):
|
|
while len(bags) <= i:
|
|
bags.append({})
|
|
nstart = parc[i]
|
|
nend = parc[i + 1]
|
|
if nstart != None and nend != None:
|
|
if nstart not in bags[i]:
|
|
bags[i][nstart] = {}
|
|
if nend not in bags[i][nstart]:
|
|
bags[i][nstart][nend] = 1
|
|
else:
|
|
bags[i][nstart][nend] += 1
|
|
return bags
|
|
|
|
|
|
def node_color(x):
|
|
color = colors["NORMAL"]
|
|
if x[0:4] == "FAIL":
|
|
color = f"{colors['FAIL']} <<"
|
|
elif x[0:4] == "+DUT":
|
|
color = f"{colors['+DUT']} <<"
|
|
elif x[0:4] == "QUIT":
|
|
color = f"{colors['QUIT']} <<"
|
|
elif x[0:3] == "RED":
|
|
color = f"{colors['RED']} <<"
|
|
elif x[0:4] == "DIPL":
|
|
color = f"{colors['SUCCESS']} <<"
|
|
elif x[0:3] == "EXT":
|
|
color = f"{colors['NEW']} >>"
|
|
elif x[0:3] == "BUT":
|
|
color = f"{colors['NORMAL']}"
|
|
elif x[0:3] == "DUT":
|
|
color = f"{colors['OLD']} >>"
|
|
if x[-1] == "*":
|
|
color = f"{colors['TRANSPARENT']} >>"
|
|
return color
|
|
|
|
|
|
def textwidth(text, font="Arial", fontsize=14):
|
|
try:
|
|
import cairo
|
|
except:
|
|
return len(text) * fontsize
|
|
surface = cairo.SVGSurface("undefined.svg", 1280, 200)
|
|
cr = cairo.Context(surface)
|
|
cr.select_font_face(font, cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD)
|
|
cr.set_font_size(fontsize)
|
|
xbearing, ybearing, width, height, xadvance, yadvance = cr.text_extents(text)
|
|
return width
|
|
|
|
|
|
def crossweight(node_position, node_layer, edges):
|
|
w = 0
|
|
for e in edges:
|
|
for ee in edges:
|
|
if node_layer[e[0]] != node_layer[ee[0]]:
|
|
continue
|
|
if node_layer[e[1]] != node_layer[ee[1]]:
|
|
continue
|
|
if (node_position[e[0]] - node_position[ee[0]]) * (
|
|
node_position[e[1]] - node_position[ee[1]]
|
|
) < 0:
|
|
w += e[2] * ee[2]
|
|
return w
|
|
|
|
|
|
def genetic_optimize(node_position, node_layer, edges, loops=1):
|
|
debug(f"Begin genetic optimization with {loops} loops")
|
|
oldcandidates = []
|
|
l_indices = list(range(5))
|
|
lays = []
|
|
randomness_l = []
|
|
for index in range(5):
|
|
lays.append([x for x in node_layer.keys() if node_layer[x] == index])
|
|
if len(lays[index]) > 1:
|
|
for i in lays[index]:
|
|
randomness_l.append(index)
|
|
w = crossweight(node_position, node_layer, edges)
|
|
|
|
for i in range(20):
|
|
oldcandidates.append([node_position.copy(), w])
|
|
w = crossweight(node_position, node_layer, edges)
|
|
for i in range(10):
|
|
n = node_position.copy()
|
|
l_idx = random.choice(randomness_l)
|
|
q = lays[l_idx].copy()
|
|
k = 0
|
|
while len(q) > 0:
|
|
nn = random.choice(q)
|
|
q.remove(nn)
|
|
n[nn] = k
|
|
k += 1
|
|
oldcandidates.append([n, w])
|
|
candidates = oldcandidates
|
|
for i in range(loops):
|
|
oldcandidates = candidates
|
|
oldcandidates.sort(key=lambda x: x[1])
|
|
candidates = oldcandidates[:30]
|
|
while len(candidates) < 60:
|
|
# mutate some random candidate
|
|
candidate = random.choice(candidates)[0]
|
|
new_position = candidate.copy() # Copier la position pour la muter
|
|
l_idx = random.choice(randomness_l)
|
|
swapa = random.choice(lays[l_idx])
|
|
swapb = random.choice(lays[l_idx])
|
|
while swapa == swapb:
|
|
swapb = random.choice(lays[l_idx])
|
|
tmp = new_position[swapa]
|
|
new_position[swapa] = new_position[swapb]
|
|
new_position[swapb] = tmp
|
|
w = crossweight(new_position, node_layer, edges)
|
|
candidates.append([new_position, w])
|
|
while len(candidates) < 90:
|
|
# mutate some random candidate
|
|
candidate = random.choice(candidates)[0]
|
|
new_position = candidate.copy() # Copier la position pour la muter
|
|
l_idx = random.choice(randomness_l)
|
|
startidx = random.randrange(len(lays[l_idx]) - 1)
|
|
stopidx = random.randrange(startidx + 1, len(lays[l_idx]))
|
|
for n in lays[l_idx]:
|
|
if new_position[n] >= startidx and new_position[n] < stopidx:
|
|
new_position[n] += 1
|
|
elif new_position[n] == stopidx:
|
|
new_position[n] = startidx
|
|
w = crossweight(new_position, node_layer, edges)
|
|
candidates.append([new_position, w])
|
|
while len(candidates) < 100:
|
|
# mutate some random candidate
|
|
candidate = random.choice(candidates)[0]
|
|
candidate2 = random.choice(candidates)[0]
|
|
new_position = candidate.copy() # Copier la position pour la muter
|
|
l_idx = random.choice(randomness_l)
|
|
for n in lays[l_idx]:
|
|
new_position[n] = candidate2[n]
|
|
w = crossweight(new_position, node_layer, edges)
|
|
candidates.append([new_position, w])
|
|
candidates.sort(key=lambda x: x[1])
|
|
orders = []
|
|
best = candidates[0][0]
|
|
for i in range(5):
|
|
b = lays[i].copy()
|
|
b.sort(key=lambda x: best[x])
|
|
orders.append(b)
|
|
return orders
|
|
|
|
|
|
def ordernodes(layers, orders, edges):
|
|
node_position = {}
|
|
node_layer = {}
|
|
newls = [[], [], [], [], []]
|
|
if orders != {}:
|
|
for i in range(len(newls)):
|
|
ls = newls[i]
|
|
for node in orders[i]:
|
|
if node in layers[i]:
|
|
ls.append(node)
|
|
for node in layers[i]:
|
|
if node not in ls:
|
|
ls.append(node)
|
|
for layer, layernodes in enumerate(newls):
|
|
for j, n in enumerate(layernodes):
|
|
node_position[n] = j
|
|
node_layer[n] = layer
|
|
else:
|
|
for layer, layernodes in enumerate(layers):
|
|
for j, n in enumerate(layernodes):
|
|
node_position[n] = j
|
|
node_layer[n] = layer
|
|
debug("Solution has weight" + str(crossweight(node_position, node_layer, edges)))
|
|
return node_position, node_layer, newls
|
|
|
|
|
|
def get_layer_structure(newls, node_structure, spacing, hmargin, height):
|
|
layer_structure = []
|
|
density = []
|
|
for i in range(5):
|
|
ls = {}
|
|
ls["olayer"] = newls[i]
|
|
ls["num"] = len(newls[i])
|
|
ls["inout"] = 0
|
|
for j in ls["olayer"]:
|
|
lhi = 0
|
|
lho = 0
|
|
slhi = 0
|
|
slho = 0
|
|
k = node_structure[j]
|
|
for prev_node in k["prev"]:
|
|
lhi += prev_node[1]
|
|
slhi += prev_node[2]
|
|
for next_node in k["next"]:
|
|
lho += next_node[1]
|
|
slho += next_node[2]
|
|
k["size"] = max(lhi, lho)
|
|
k["ssize"] = max(slhi, slho)
|
|
k["in"] = lhi
|
|
k["out"] = lho
|
|
if lhi != lho and lhi * lho != 0:
|
|
warning(f"BUG1: {j} {k} {lhi} {lho}")
|
|
ls["inout"] += k["size"]
|
|
layer_structure.append(ls)
|
|
if height == 0:
|
|
minheight = 0
|
|
for i in range(5):
|
|
ls = layer_structure[i]
|
|
new_minheight = spacing * (ls["num"] - 1) + 2 * hmargin + 2 * ls["inout"]
|
|
if new_minheight > minheight:
|
|
minheight = new_minheight
|
|
height = (1 + (minheight // 150)) * 150
|
|
for i in range(5):
|
|
ls = layer_structure[i]
|
|
ls["density"] = ls["inout"] / (
|
|
spacing + height - spacing * ls["num"] - 2 * hmargin
|
|
)
|
|
density.append(ls["density"])
|
|
realdensity = max(density)
|
|
for i in range(5):
|
|
ls = layer_structure[i]
|
|
supp_spacing = (
|
|
spacing
|
|
+ height
|
|
- 2 * hmargin
|
|
- spacing * ls["num"]
|
|
- ls["inout"] / realdensity
|
|
) / (ls["num"] + 1)
|
|
cs = hmargin - spacing
|
|
for j in ls["olayer"]:
|
|
ns = node_structure[j]
|
|
ns["top"] = supp_spacing + spacing + cs
|
|
h = ns["size"] / realdensity
|
|
sh = ns["ssize"] / realdensity
|
|
ns["middle"] = ns["top"] + sh
|
|
cs = ns["bottom"] = ns["top"] + h
|
|
return realdensity, height, layer_structure
|
|
|
|
|
|
def nodestructure_from_bags(bags, sbags=None):
|
|
node_structure = {}
|
|
layers = [[], [], [], [], []]
|
|
edges = []
|
|
union_sbag = {}
|
|
if sbags is not None:
|
|
for layer, layernodes in enumerate(sbags):
|
|
for startnode in layernodes:
|
|
for endnode in layernodes[startnode]:
|
|
if startnode not in union_sbag:
|
|
union_sbag[startnode] = {}
|
|
union_sbag[startnode][endnode] = layernodes[startnode][endnode]
|
|
for layer, layernodes in enumerate(bags):
|
|
for startnode in layernodes:
|
|
# if startnode[-1] == "*":
|
|
# continue
|
|
for endnode in layernodes[startnode]:
|
|
# if endnode[-1] == "*":
|
|
# continue
|
|
weight = layernodes[startnode][endnode]
|
|
if startnode in union_sbag and endnode in union_sbag[startnode]:
|
|
sweight = union_sbag[startnode][endnode]
|
|
elif sbags is None:
|
|
sweight = -1
|
|
else:
|
|
sweight = 0
|
|
if endnode not in node_structure:
|
|
node_structure[endnode] = {
|
|
"prev": [[startnode, weight, sweight]],
|
|
"next": [],
|
|
"layer": layer + 1,
|
|
}
|
|
layers[layer + 1].append(endnode)
|
|
else:
|
|
node_structure[endnode]["prev"].append([startnode, weight, sweight])
|
|
if startnode not in node_structure:
|
|
node_structure[startnode] = {
|
|
"prev": [],
|
|
"next": [[endnode, weight, sweight]],
|
|
"layer": layer,
|
|
}
|
|
layers[layer].append(startnode)
|
|
else:
|
|
node_structure[startnode]["next"].append([endnode, weight, sweight])
|
|
edges.append([startnode, endnode, weight])
|
|
return node_structure, layers, edges
|
|
|
|
|
|
def compute_svg(height, padding, realdensity, node_structure):
|
|
unit_ratio = 96 / 72
|
|
thickness = Options.thickness
|
|
fontsize_name = Options.fontsize_name
|
|
fontsize_count = Options.fontsize_count
|
|
width = Options.width
|
|
columns = []
|
|
l = 0
|
|
for i in range(5):
|
|
l += width / 6
|
|
columns.append(l)
|
|
d = drawsvg.Drawing(width, height, origin=(0, 0), id_prefix=orderkey)
|
|
g1 = drawsvg.Group() # rectangles representing the student-sets
|
|
g2 = drawsvg.Group() # curves representing the transitions
|
|
g3 = drawsvg.Group() # texts (captions of each rectangle)
|
|
g4 = drawsvg.Group() # white rectangles below texts (for readability)
|
|
font_offset = max(fontsize_count, fontsize_name)
|
|
for n in node_structure:
|
|
ns = node_structure[n]
|
|
col = node_color(n).split(" ")[0].split(".")[0]
|
|
ns["color"] = col
|
|
xpos = width / 6 * (ns["layer"] + 1)
|
|
r = drawsvg.Rectangle(
|
|
xpos - thickness,
|
|
ns["top"],
|
|
2 * thickness,
|
|
ns["bottom"] - ns["top"],
|
|
fill=col,
|
|
opacity=0.5,
|
|
stroke_width=0.1,
|
|
stroke="#808080",
|
|
)
|
|
g1.append(r)
|
|
r = drawsvg.Rectangle(
|
|
xpos - thickness,
|
|
ns["top"],
|
|
2 * thickness,
|
|
ns["middle"] - ns["top"],
|
|
fill=col,
|
|
stroke_width=0.2,
|
|
stroke="black",
|
|
)
|
|
g1.append(r)
|
|
nw = textwidth(displaynames[n], "Arial", fontsize_name) * unit_ratio
|
|
cw = textwidth(str(ns["size"]), "Arial", fontsize_count) * unit_ratio
|
|
gw = nw + cw + padding
|
|
ggw = gw + 2 * padding
|
|
nxpos = xpos - gw / 2 + cw + padding + nw / 2
|
|
ypos = (ns["top"] + ns["bottom"]) / 2
|
|
cxpos = cw / 2 - gw / 2 + xpos
|
|
rxpos = xpos
|
|
if ns["in"] == 0:
|
|
nxpos -= gw / 2 + padding + thickness
|
|
cxpos -= gw / 2 + padding + thickness
|
|
rxpos -= gw / 2 + padding + thickness
|
|
if ns["out"] == 0:
|
|
nxpos += gw / 2 + padding + thickness
|
|
cxpos += gw / 2 + padding + thickness
|
|
rxpos += gw / 2 + padding + thickness
|
|
t = drawsvg.Text(
|
|
displaynames[n],
|
|
str(fontsize_name) + "pt",
|
|
nxpos,
|
|
ypos + fontsize_name / 2,
|
|
fill="black",
|
|
text_anchor="middle",
|
|
font_family="Arial",
|
|
)
|
|
tt = drawsvg.Text(
|
|
str(ns["size"]),
|
|
str(fontsize_count) + "pt",
|
|
cxpos,
|
|
ypos + fontsize_count / 2,
|
|
fill="black",
|
|
text_anchor="middle",
|
|
font_family="Arial",
|
|
)
|
|
g3.append(t)
|
|
g3.append(tt)
|
|
g4.append(
|
|
drawsvg.Rectangle(
|
|
rxpos - gw / 2 - padding,
|
|
ypos - font_offset / 2 - padding,
|
|
ggw,
|
|
font_offset + 2 * padding,
|
|
stroke="black",
|
|
stroke_width=0,
|
|
fill="white",
|
|
fill_opacity=".5",
|
|
rx=padding,
|
|
ry=padding,
|
|
)
|
|
)
|
|
for n in node_structure:
|
|
ns = node_structure[n]
|
|
ns["prev"].sort(key=lambda x: node_structure[x[0]]["top"])
|
|
ns["next"].sort(key=lambda x: node_structure[x[0]]["top"])
|
|
start = ns["top"]
|
|
for link in ns["prev"]:
|
|
ysize = link[1]
|
|
sysize = link[2]
|
|
link.append(start)
|
|
link.append(start + sysize / realdensity)
|
|
start += ysize / realdensity
|
|
link.append(start)
|
|
for n in node_structure:
|
|
ns = node_structure[n]
|
|
start = ns["top"]
|
|
for link in ns["next"]:
|
|
ysize = link[1]
|
|
sysize = link[2]
|
|
link.append(start)
|
|
link.append(start + sysize / realdensity)
|
|
start += ysize / realdensity
|
|
link.append(start)
|
|
targets = node_structure[link[0]]
|
|
target = None
|
|
for t in targets["prev"]:
|
|
if t[0] == n:
|
|
target = t
|
|
if target == None:
|
|
print(f"BUG: {n},{ns},{t}")
|
|
sys.exit(5)
|
|
# At this point, link has values target_name, size, secondary_size, top, middle, bottom
|
|
posxa = columns[ns["layer"]] + thickness
|
|
posxb = columns[targets["layer"]] - thickness
|
|
posxc = (3 * posxa + posxb) / 4
|
|
posxd = (posxa + 3 * posxb) / 4
|
|
grad = drawsvg.LinearGradient(posxa, 0, posxb, 0)
|
|
grad.add_stop(0, ns["color"], opacity=0.5)
|
|
grad.add_stop(1, targets["color"], opacity=0.5)
|
|
posyat = link[3]
|
|
posyam = link[4]
|
|
posyab = link[5]
|
|
posybt = target[3]
|
|
posybm = target[4]
|
|
posybb = target[5]
|
|
p = drawsvg.Path(fill=grad, stroke="#000", stroke_width=0, opacity=0.8)
|
|
p.M(posxa, posyab)
|
|
p.C(posxc, posyab, posxd, posybb, posxb, posybb)
|
|
p.L(posxb, posybt)
|
|
p.C(posxd, posybt, posxc, posyat, posxa, posyat)
|
|
p.Z()
|
|
p.append_title(
|
|
f"{displaynames[n]}=>{displaynames[link[0]]}: {link[2]}/{link[1]}"
|
|
)
|
|
g2.append(p)
|
|
p = drawsvg.Path(fill=grad, stroke="#000", stroke_width=0)
|
|
p.M(posxa, posyam)
|
|
p.C(posxc, posyam, posxd, posybm, posxb, posybm)
|
|
p.L(posxb, posybt)
|
|
p.C(posxd, posybt, posxc, posyat, posxa, posyat)
|
|
p.Z()
|
|
p.append_title(
|
|
f"{displaynames[n]}=>{displaynames[link[0]]}: {link[2]}/{link[1]}"
|
|
)
|
|
g2.append(p)
|
|
d.append(g2)
|
|
d.append(g1)
|
|
d.append(g4)
|
|
d.append(g3)
|
|
return d
|
|
|
|
|
|
def printsvg():
|
|
padding = 4
|
|
spacing = Options.spacing
|
|
height = Options.height
|
|
hmargin = Options.hmargin
|
|
bags = bags_from_students(student, Options.filter())
|
|
sbags = bags_from_students(student, Options.filter(main=False))
|
|
node_structure, layers, edges = nodestructure_from_bags(bags, sbags)
|
|
filename = "best-" + orderkey
|
|
if Options.algo == 2:
|
|
try:
|
|
os.remove(filename)
|
|
except OSError:
|
|
pass
|
|
if Options.algo < 2:
|
|
lastorders = read_conf(filename)
|
|
else:
|
|
lastorders = {}
|
|
node_position, node_layer, newls = ordernodes(layers, lastorders, edges)
|
|
if Options.algo != 1:
|
|
orders = genetic_optimize(node_position, node_layer, edges, loops=Options.loops)
|
|
else:
|
|
orders = newls
|
|
write_conf("best-" + orderkey, orders)
|
|
info(format_for_shell(OptionSet({"orders": orders}).asCLI(onlyOrders=True)))
|
|
Options.orders = orders
|
|
write_conf(Options.orderkey(filters=True), Options.asCLI())
|
|
info(format_for_shell(OptionSet({"orders": orders}).asCLI(onlyOrders=True)))
|
|
node_position, node_layer, newls = ordernodes(layers, orders, edges)
|
|
realdensity, height, layer_structure = get_layer_structure(
|
|
newls, node_structure, spacing, hmargin, height
|
|
)
|
|
d = compute_svg(height, padding, realdensity, node_structure)
|
|
d.save_svg(orderkey + ".svg")
|
|
|
|
|
|
printsvg()
|
|
for x in strangecases:
|
|
print(f"// {x}")
|