#!/usr/bin/env python3 # Standard libs import argparse import csv import json import os import pdb # used for debugging import inspect # used for debugging import random import sys # Third-party libs import requests from requests.auth import HTTPBasicAuth import drawsvg try: from dotenv import load_dotenv except ModuleNotFoundError: print("\nError: dotenv not installed !", file=sys.stderr) print("You may install it using:\npip install python-dotenv\n", file=sys.stderr) def die(msg: str, status=3): print(msg, file=sys.stderr) sys.exit(status) def debug(*args): if not hasattr(debug, "counter"): debug.counter = 1 # Initialize the counter if it doesn't exist else: debug.counter += 1 caller_frame = inspect.currentframe().f_back where = str(caller_frame.f_lineno) + "@" + caller_frame.f_code.co_name if len(args) > 0: print(f"[DEBUG {debug.counter}:{where}] " + str(args[0]), args[1:]) else: print(f"[DEBUG {debug.counter}:{where}] (no reason given)") def warning(*args): if len(args) > 0: print("[WARNING] " + str(args[0]), args[1:]) else: print("[WARNING] (no reason given)") def info(*args): if len(args) > 0: print("[INFO] " + str(args[0]), args[1:]) else: print("[INFO] (no reason given)") load_dotenv(".env") SCODOC_SERVER = os.environ.get("SCODOC_SERVER") or "http://localhost:5000" SCODOC_USER = os.environ.get("SCODOC_USER") or die( "SCODOC_USER must be set in .env or the environment" ) SCODOC_PASSWORD = os.environ.get("SCODOC_PASSWORD") or die( "SCODOC_PASSWORD must be set in .env or the environment" ) API_URL = f"{SCODOC_SERVER}/ScoDoc/api" # TODO : refactor globals DEBUG = True # Not used BLOCKING = True # Die if csv is incorrect # TODO : refactor / put globals in a class, eg Config depts = [] orderkey = "" def blockordie(reason: str = "", status: int = 2): if reason: print(reason) else: print("Blocking, no reason given") if BLOCKING: sys.exit(status) class Options: pass # def cli_check(): """Read args from the command line then read config from {orderkey}.json """ global orderkey # TODO: globales à supprimer global depts parser = argparse.ArgumentParser(description="Process some departments.") parser.add_argument("depts", nargs="*", help="List of departments") parser.add_argument( "--base", "-b", type=int, choices=range(2000, 2067), default=2021, help="base year for the cohort (integer between 2000 and 2666)", ) optimize_group = parser.add_mutually_exclusive_group() optimize_group.add_argument( "--reuse", action="store_true", help="Reuse mode, sets value to 0" ) optimize_group.add_argument( "--optimize", type=str, nargs="?", const="100", # Default value if --optimize is used without specifying n help="Optimize mode, takes an optional integer (default is 100, or 300 if no optimization option specified)", ) optimize_group.add_argument( "--restart", type=str, nargs="?", const="300", # Default value if --restart is used without specifying n help="Restart & Optimize mode, takes an optional integer (default is 300)", ) args = parser.parse_args() Options.restart = False if args.reuse: Options.optimize = 0 elif args.restart is not None: Options.restart = True try: Options.optimize = -int(args.restart) except (TypeError, ValueError): Options.optimize = -300 if args.restart: args.depts.insert(0, args.restart) else: try: Options.optimize = int(args.optimize) except (TypeError, ValueError): Options.optimize = 300 if args.optimize: args.depts.insert(0, args.optimize) Options.base_year = args.base depts = args.depts orderkey = "_".join(depts) if len(depts) == 0: parser.print_help() sys.exit(0) def api_url(dept: str | None = None): """L'URL de l'API départementale.""" # peut être modifié ici pour n'utiliser que l'API globale return ( f"{SCODOC_SERVER}/ScoDoc/{dept}/api" if dept else f"{SCODOC_SERVER}/ScoDoc/api" ) cli_check() def read_conf(key): if os.path.exists(f"{key}.json"): with open(f"{key}.json", "r") as f: return json.load(f) return {} def write_conf(key, obj): with open(f"{key}.json", "w") as f: return json.dump(obj, f) return {} conf = read_conf(orderkey) defaults = { "spacing": 14, "thickness": 6, "fontsize_name": 10, "fontsize_count": 14, "width": 1300, "height": 0, "hmargin": 20, "parcours_separator": "/", "year_separator": " ", "rank_separator": "", "diplome_separator": "", "reuse": "yes", "optimize": "yes", "main_filter": 0, "secondary_filter": 1, } def conf_value(xkey: str): """Manage default values""" if xkey in conf: return conf[xkey] if xkey in defaults: return defaults[xkey] if xkey[-9:] == "separator": return " " if xkey == "nick": return "{diplome}{rank}{multidepartment}{modalite}{parcours}{year}" if xkey == "displayname": return "{diplome}{rank}{multidepartment}{modaliteshort}{parcours}" if xkey == "extnick": return "{ext}{rank}{multidepartment}{diplomenobut}{modaliteshort}" if xkey == "orders": return [[], [], [], [], []] return {} student = {} CACHE_FILE = "cache.json" def load_cache(cache_file): if os.path.exists(cache_file): with open(cache_file, "r") as f: return json.load(f) return {} def save_cache(cache, file=None): if file == None: file = CACHE_FILE with open(CACHE_FILE, "w") as f: json.dump(cache, f) cache = load_cache(CACHE_FILE) # Read color theme # There are default color values, so may be it should just join the conf.json file def read_theme(): if os.path.exists("theme.csv"): with open("theme.csv", newline="") as csvfile: csvreader = csv.reader(csvfile, delimiter=",", quotechar='"') for row in csvreader: if len(row) == 0: continue elif len(row[0]) == 0: blockordie("Wrong line in theme : " + str(row)) elif row[0][0] == "#": continue else: colors[row[0]] = row[1] colors = { "+DUT": "#0040C0", "QUIT": "#00FF00", "SUCCESS": "#0000FF", "NORMAL": "#C0C0C0", "FAIL": "#FF4040", "OLD": "#FF8000", "NEW": "#FFFF00", "TRANSPARENT": "#FFFFFF.0", "RED": "#000000", } read_theme() # Read redirects # Only one file, since various combinations including same departments should # use the same redirections ("no jury yet but almost sure it will be ...") def read_redirects(): if os.path.exists("redirect.csv"): with open("redirect.csv", newline="") as csvfile: csvreader = csv.reader(csvfile, delimiter=",", quotechar='"') for row in csvreader: if len(row) == 0: continue elif len(row[0]) == 0: blockordie("Wrong line in redirect : " + str(row)) elif row[0][0] == "#": continue else: redirects[int(row[0])] = row[1] redirects = {} read_redirects() # Gestion globale d'un jeton pour l'API token = None def get_json(url: str, params=None): debug(f"Requesting {url}") global token if token == None: url_token = f"{API_URL}/tokens" response = requests.post( url_token, auth=HTTPBasicAuth(SCODOC_USER, SCODOC_PASSWORD) ) if response.status_code == 200: token = response.json().get("token") else: blockordie( f"Erreur de récupération de token: {response.status_code} - {response.text}", status=1, ) headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: # Afficher la réponse JSON return response.json() else: blockordie( f"Erreur avec {url}: {response.status_code} - {response.text}", status=1 ) formsem_dept = {} formsem_department = {} def get_formsem_from_dept(dept): if "formsems" in cache and dept in cache["formsems"]: return cache["formsems"][dept] if "formsems" not in cache: cache["formsems"] = {} if "sem" not in cache: cache["sem"] = {} query_url = f"{api_url(dept)}/formsemestres/query" formsemestres = get_json(query_url) result = [] for sem in formsemestres: semid = str(sem["formsemestre_id"]) formsem_dept[semid] = dept cache["sem"][semid] = sem result.append(semid) cache["formsems"][dept] = result save_cache(cache) return result def get_formations_from_dept(dept): if "formations" in cache and dept in cache["formations"]: return cache["formations"][dept] if "formations" not in cache: cache["formations"] = {} query_url = f"{api_url(dept)}/formations" formations = get_json(query_url) result = [] for f in formations: if f["type_parcours"] == 700: result.append(f["formation_id"]) cache["formations"][dept] = result save_cache(cache) return result def get_etuds_from_formsem(dept, semid): if type(semid) == type(0): semid = str(semid) if "etudlist" in cache and semid in cache["etudlist"]: return cache["etudlist"][semid] if "etudlist" not in cache: cache["etudlist"] = {} query_url = f"{api_url(dept)}/formsemestre/{semid}/etudiants/long" result = get_json(query_url) cache["etudlist"][semid] = result save_cache(cache) return result def get_jury_from_formsem(dept: str, semid): if type(semid) == type(0): semid = str(semid) if "semjury" in cache and semid in cache["semjury"]: return cache["semjury"][semid] if "semjury" not in cache: cache["semjury"] = {} # query_url = f"{server}{dept}/Scolarite/Notes/formsemestre_recapcomplet?formsemestre_id={semid}&mode_jury=1&tabformat=json" query_url = f"{api_url(dept)}/formsemestre/{semid}/decisions_jury" result = get_json(query_url) cache["semjury"][semid] = result save_cache(cache) return result def get_override(sem, xkey, default=None): overrides = conf_value("override") for j in ["titre_num", "titre", "session_id"]: if ( j in sem and j in overrides and sem[j] in overrides[j] and xkey in overrides[j][sem[j]] ): return overrides[j][sem[j]][xkey] return default def nick_replace( department, diplome, rank, modalite, parcours, nick, year=Options.base_year ): if type(rank) != int: rank = 0 if len(department) > 0: nick = nick.replace( "{department}", conf_value("department_separator") + department ) else: nick = nick.replace("{department}", "") if len(department) > 0 and len(depts) > 1: nick = nick.replace( "{multidepartment}", conf_value("department_separator") + department ) else: nick = nick.replace("{multidepartment}", "") if len(diplome) > 0: nick = nick.replace("{diplome}", conf_value("diplome_separator") + diplome) else: nick = nick.replace("{diplome}", "") if len(diplome) > 0 and diplome != "BUT": nick = nick.replace("{diplomenobut}", conf_value("diplome_separator") + diplome) else: nick = nick.replace("{diplomenobut}", "") if rank > 0: nick = nick.replace("{rank}", conf_value("rank_separator") + str(rank)) else: nick = nick.replace("{rank}", "") nick = nick.replace( "{year}", conf_value("year_separator") + str(Options.base_year + rank - 1) ) if diplome != "BUT": nick = nick.replace( "{yearnobut}", conf_value("year_separator") + str(Options.base_year + rank - 1), ) else: nick = nick.replace("{yearnobut}", "") if len(modalite) > 0: nick = nick.replace("{modalite}", conf_value("modalite_separator") + modalite) else: nick = nick.replace("{modalite}", "") if len(modalite) > 0 and modalite != "FI": nick = nick.replace("{modaliteshort}", modalite[-1]) else: nick = nick.replace("{modaliteshort}", "") if len(parcours) > 0: nick = nick.replace("{parcours}", conf_value("parcours_separator") + parcours) else: nick = nick.replace("{parcours}", "") extname = "Ecand " if diplome == "BUT": extname = "EXT" nick = nick.replace("{ext}", extname) return nick def analyse_student(semobj, etud, univ_year=None): """Returns the final (department,diplome,rank,modalite,parcours,nickname,displayname) tuple from etudid in semid, taking into accounts overrides.""" session_id = semobj["session_id"].split("-") year = str(semobj["annee_scolaire"]) department = session_id[0] diplome = session_id[1] modalite = session_id[2] if univ_year == None: if semobj["semestre_id"] < 0: rank = 1 else: rank = (semobj["semestre_id"] + 1) // 2 else: rank = univ_year parcours = None groups = [] if "groups" in etud: for x in etud["groups"]: if x["partition_name"] == "Parcours": parcours = x["group_name"] groups.append(x["group_name"]) if parcours == None: parcours = "" parcours = get_override(semobj, "parcours", parcours) department = get_override(semobj, "department", department) rank = get_override(semobj, "rank", rank) diplome = get_override(semobj, "diplome", diplome) modalite = get_override(semobj, "modalite", modalite) formsem_department[str(semobj["id"])] = department if len(modalite) > 0 and modalite[0] == "G": goal = modalite.split(":")[1:] modalite = None for g in goal: gg = g.split("=") if gg[0] in groups: modalite = gg[1] nick = conf_value("nick") nick = nick_replace(department, diplome, rank, modalite, parcours, nick, year) displayname = conf_value("displayname") displayname = nick_replace( department, diplome, rank, modalite, parcours, displayname, year ) return department, diplome, rank, modalite, parcours, nick, displayname def get_nick(semobj, etud): department, diplome, rank, modalite, parcours, nick, displayname = analyse_student( semobj, etud ) return nick, displayname def get_dept_from_sem(semid): return formsem_department[str(semid)] oldsems = set() oldsemsdept = {} futuresems = set() futuresemsdept = {} bacs = set() cohort_nip = set() def analyse_depts(): for dept in depts: formsems = get_formsem_from_dept(dept) for semid in formsems: # Check if this is a part of the cohort # or a future/old semester sem = cache["sem"][str(semid)] if sem["semestre_id"] < 0: year = 1 else: year = (sem["semestre_id"] + 1) // 2 offset = sem["annee_scolaire"] - Options.base_year - year + 1 if offset < 0 and offset > -4: oldsems.add(str(semid)) oldsemsdept[semid] = dept if offset > 0 and offset < 4: futuresems.add(str(semid)) futuresemsdept[semid] = dept if offset != 0: continue if sem["formation"]["type_parcours"] != 700: continue if sem["modalite"] == "EXT": continue # This is a BUT semester, part of the cohort # 0,1 : preceding year ; 2-7 : cohort ; 8+ : future if sem["semestre_id"] < 0: bucket = 1 else: bucket = str(int(sem["semestre_id"] - 1)) # Ici, le semestre est donc un semestre intéressant # On prélève tous les étudiants, et on remplit leur cursus etuds = get_etuds_from_formsem(dept, semid) jurys = get_jury_from_formsem(dept, semid) key = sem["titre_num"] for etud in etuds: etudid = etud["id"] if etudid in student: studentsummary = student[etudid] else: studentsummary = {} studentsummary["cursus"] = {} # Cursus is semid studentsummary["etudid"] = {} # useful when merging students studentsummary["pseudodept"] = {} # pseudo-dept for interdept studentsummary["diplome"] = {} # diplome name studentsummary["rank"] = {} # rank studentsummary["modalite"] = {} # modalite studentsummary["parcours"] = {} # parcours studentsummary["nickname"] = {} # nick studentsummary["displayname"] = {} # display name studentsummary["dept"] = dept # useful when merging students studentsummary["bac"] = "" # usually department, diplome, rank, modalite, parcours, nick, displayname = ( analyse_student(sem, etud, year) ) if "bac" in etud["admission"]: studentsummary["bac"] = etud["admission"]["bac"] else: studentsummary["bac"] = "INCONNU" if "civilite" in etud: studentsummary["civilite"] = etud["civilite"] else: studentsummary["civilite"] = "?" bacs.add(studentsummary["bac"]) if bucket in studentsummary["cursus"]: semestreerreur = int(bucket) + 1 warning( f"// Élève {etudid} dans deux semestres à la fois : S{semestreerreur}, semestres {studentsummary['cursus'][bucket]} et {semid}" ) if "dept" in studentsummary and studentsummary["dept"] != dept: warning( f"// Élève ayant changé de département {dept},{studentsummary['dept']}" ) # department, diplome, rank, modalite, parcours, nick = analyse_student( studentsummary["cursus"][bucket] = semid studentsummary["etudid"][bucket] = etudid studentsummary["pseudodept"][bucket] = department studentsummary["diplome"][bucket] = diplome studentsummary["rank"][bucket] = rank studentsummary["modalite"][bucket] = modalite studentsummary["parcours"][bucket] = parcours studentsummary["nickname"][bucket] = nick studentsummary["displayname"][bucket] = displayname studentsummary["debug"] = etud["sort_key"] # TODO: REMOVE studentsummary["unid"] = etud["code_nip"] cohort_nip.add(etud["code_nip"]) student[etudid] = studentsummary analyse_depts() def allseeingodin(): """This function changes the student lists by peeking in the past and the future to know which students come from another cohort or go into a later cohort.""" displaynames = {} oldstudents = {} oldstudentslevel = {} futurestudents = {} futurestudentslevel = {} # We look for the latest "old semester" in which every (old) student went for semid in oldsems: sem = cache["sem"][semid] semlevel = sem["semestre_id"] # For a while, some people registered former semesters (in other places) with "EXT" modalite for a fake semester if sem["modalite"] == "EXT": # Ignore EXT modalite continue semlevel = abs(semlevel) dept = oldsemsdept[semid] etuds = get_etuds_from_formsem(dept, semid) for etud in etuds: nip = etud["code_nip"] if nip not in cohort_nip: continue if nip not in oldstudentslevel or semlevel > oldstudentslevel[nip]: oldstudentslevel[nip] = semlevel nick_t, disp_t = get_nick(sem, etud) oldstudents[nip] = [semid, nick_t] displaynames[nick_t] = disp_t for semid in futuresems: sem = cache["sem"][semid] if sem["formation"]["type_parcours"] != 700: # We are only interested in BUT continuations (for now) continue semlevel = sem["semestre_id"] semlevel = abs(semlevel) dept = futuresemsdept[semid] etuds = get_etuds_from_formsem(dept, semid) for etud in etuds: nip = etud["code_nip"] if nip not in cohort_nip: continue if nip not in futurestudentslevel or semlevel > futurestudentslevel[nip]: futurestudentslevel[nip] = semlevel futurestudents[nip], tmp = get_nick(sem, etud) unification = {} duplicates = {} for etudid in student.keys(): unid = student[etudid]["unid"] if unid in unification: if unid not in duplicates: duplicates[unid] = [unification[unid]] duplicates[unid].append(etudid) unification[unid] = etudid if unid in oldstudents: student[etudid]["old"] = oldstudents[unid][1] student[etudid]["oldsem"] = oldstudents[unid][0] if unid in futurestudents: student[etudid]["future"] = futurestudents[unid] for unid in duplicates: lastsem = -1 best = [] for suppidx in duplicates[unid][1:]: supp = student[suppidx] if str(lastsem) in supp["cursus"]: best.append(suppidx) for sem in range(5, lastsem, -1): if str(sem) in supp["cursus"]: lastsem = sem best = [suppidx] break if len(best) > 1: print(f"// Error: cannot chose last semester for NIP {unid}: ") print(repr(best)) for x in best: print(cache["sem"][str(x)]) sys.exit(6) bestid = best[0] base = student[bestid] for suppidx in duplicates[unid]: if suppidx == bestid: continue supp = student[suppidx] for skey in ( "cursus", "etudid", "pseudodept", "diplome", "rank", "modalite", "parcours", "nickname", "displayname", "old", "oldsem", ): if skey in supp: for bucket in supp[skey]: if bucket not in base[skey]: base[skey][bucket] = supp[skey][bucket] del student[suppidx] foundfirst = False # Ensure all cursus are continuous for etudid in student: etud = student[etudid] foundfirst = False foundlast = False fillblanks = None there = -1 for i in range(6): if str(i) in etud["cursus"]: if foundfirst and foundlast: fillblanks = [there, i] else: foundfirst = True here = i else: if not foundfirst: continue foundlast = True there = i if fillblanks is not None: for i in range(fillblanks[0] - 1, fillblanks[1]): bucket = str(i) if bucket not in etud["cursus"]: etud["etudid"][bucket] = etudid etud["cursus"][bucket] = -1 etud["pseudodept"][bucket] = "OUT" etud["diplome"][bucket] = "OUT" etud["rank"][bucket] = (i // 2) + 1 etud["modalite"][bucket] = "FI" etud["parcours"][bucket] = "" etud["nickname"][bucket] = "OUT" + str(etud["rank"][bucket]) etud["displayname"][bucket] = "Césure" displaynames[etud["nickname"][bucket]] = etud["displayname"][bucket] return displaynames displaynames = allseeingodin() strange_cases = [] next = {} nextnick = {} def prepare_display(displaynames): for etudid in student: for semlevel in range(5): if str(semlevel) in student[etudid]["nickname"]: a = student[etudid]["nickname"][str(semlevel)] b = student[etudid]["displayname"][str(semlevel)] if a in displaynames: if b != displaynames[a]: die("{a} will be displayed as {b} or {displaynames[a]} !", 6) else: displaynames[a] = b return displaynames displaynames = prepare_display(displaynames) for etudid in student.keys(): etud = student[etudid] cursus_array = [None] * 6 nickname_array = [None] * 6 etudid_array = [None] * 6 for i in range(6): if str(i) in etud["cursus"]: cursus_array[i] = etud["cursus"][str(i)] nickname_array[i] = etud["nickname"][str(i)] etudid_array[i] = etud["etudid"][str(i)] # On va réduire aux semestres pairs, on cherche donc la continuation la plus habituelle pour # les élèves qui s'arrêtent sur un semestre impair for i in range(0, 5, 2): currs = str(cursus_array[i]) nexts = str(cursus_array[i + 1]) currn = str(nickname_array[i]) nextn = str(nickname_array[i + 1]) if nexts is not None: if currs not in next: next[currs] = {} if nexts not in next[currs]: next[currs][nexts] = 1 else: next[currs][nexts] += 1 if nextn is not None: if currn not in nextnick: nextnick[currn] = {} if nextn not in nextnick[currn]: nextnick[currn][nextn] = 1 else: nextnick[currn][nextn] += 1 etud["cursus_array"] = cursus_array etud["nickname_array"] = nickname_array etud["etudid_array"] = etudid_array nextbest = {} nextnickbest = {} for key in next: imax = 0 best = None for key2 in next[key]: if next[key][key2] > imax: imax = next[key][key2] best = key2 nextbest[key] = best for key in nextnick: imax = 0 best = None for key2 in nextnick[key]: if nextnick[key][key2] > imax: imax = nextnick[key][key2] best = key2 nextnickbest[key] = best evennicknames = {} for etudid in student.keys(): etud = student[etudid] for i in range(1, 6, 2): if etud["nickname_array"][i] not in evennicknames: evennicknames[etud["nickname_array"][i]] = 1 else: evennicknames[etud["nickname_array"][i]] += 1 for etudid in student.keys(): etud = student[etudid] cursus_short = [None] * 5 nickname_short = [None] * 5 etudid_short = [None] * 5 semend = None semstart = None for year in range(1, 4): sem1 = 2 * year - 2 sem2 = 2 * year - 1 finalsem = etud["cursus_array"][sem2] nick = etud["nickname_array"][sem2] etid = etud["etudid_array"][sem2] if finalsem == None: finalsem = etud["cursus_array"][sem1] nick = etud["nickname_array"][sem1] etid = etud["etudid_array"][sem1] if finalsem != None: # Abandon au premier semestre de cette année # print(f"Pour {etudid}, année {year}, abandon au S1") if nick not in evennicknames: # print( f"Pour {etudid}, année {year}, changement {nick} en {nextnickbest[nick]}" ) nick = nextnickbest[nick] if finalsem != None: cursus_short[year] = finalsem nickname_short[year] = nick etudid_short[year] = etid if etud["cursus_array"][sem1] == None: # print(f"Pour {etudid}, année {year}, saute-mouton du S1") pass etud["short"] = cursus_short etud["nickshort"] = nickname_short etud["etudidshort"] = etudid_short for etudid in student.keys(): etud = student[etudid] lastyear = 4 lastsem = None while lastsem == None: lastyear -= 1 lastsem = etud["short"][lastyear] ddd = get_dept_from_sem(lastsem) if ddd not in depts: depts.append(ddd) badred = {} goodred = {} failure = {} diploma = {} reor2 = {} reor1 = {} unknown = {} entries = {} redirs = {} finals = { "FAIL": "✘", "RED": "↩", "QUIT": "↴", "+DUT": "➡", "DIPLOME": "✔", "?": "?", } for d in depts: badred[d] = 0 goodred[d] = 0 failure[d] = 0 diploma[d] = 0 reor2[d] = 0 reor1[d] = 0 unknown[d] = 0 entries[d] = 0 redirs[d] = 0 strangecases = [] for etudid in student.keys(): etud = student[etudid] lastyear = 4 lastsem = None while lastsem == None: lastyear -= 1 lastsem = etud["short"][lastyear] ddd = get_dept_from_sem(lastsem) jury = get_jury_from_formsem(None, lastsem) etudid_real = etud["etudidshort"][lastyear] if etudid_real != etudid: print(f"// Warning {etudid} {etudid_real}") resjury = None for x in jury: if x["etudid"] == etudid_real: resjury = x break if resjury == None: print(f"// No jury for {etudid} year {lastyear}") continue resultyear = None if resjury["etat"] == "D": resultyear = "DEM" if resjury["etat"] == "DEF": resultyear = "DEF" if ( "annee" in resjury and "code" in resjury["annee"] and resjury["annee"]["code"] is not None ): resultyear = resjury["annee"]["code"] finaloutput = None checkred = False if etudid in redirects: resultyear = redirects[etudid] redirs[ddd] += 1 strangecases.append( f"REDI{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}" ) if resultyear == None: finaloutput = "?" unknown[ddd] += 1 strangecases.append( f"????{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}" ) elif resultyear in ("RAT", "ATJ"): finaloutput = "?" unknown[ddd] += 1 strangecases.append( f"ATTE{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}" ) elif resultyear in ("RED", "ABL", "ADSUP"): finaloutput = "RED" checkred = True elif lastyear == 3 and resultyear in ("ADM", "ADJ"): finaloutput = "DIPLOME" diploma[ddd] += 1 elif lastyear == 2 and resultyear in ("ADM", "ADJ"): finaloutput = "+DUT" reor2[ddd] += 1 elif resultyear in ("PAS1NCI", "PASD"): finaloutput = "QUIT" reor1[ddd] += 1 elif lastyear < 2 and resultyear in ("ADM", "ADJ"): finaloutput = "QUIT" reor1[ddd] += 1 elif resultyear in ("NAR", "DEM", "DEF", "ABAN"): finaloutput = "FAIL" failure[ddd] += 1 elif resjury["annee"]["annee_scolaire"] != Options.base_year + lastyear - 1: finaloutput = "RED" checkred = True if checkred: if "future" not in etud: # print(f"// Mauvais redoublement : {etudid}") badred[ddd] += 1 finaloutput = "FAIL" else: goodred[ddd] += 1 output = finaloutput + " " + etud["nickshort"][lastyear] etud["nickshort"][lastyear + 1] = output displaynames[output] = ( finals[finaloutput] + displaynames[etud["nickshort"][lastyear]] ) (firstsem, firstyear) = ( (etud["short"][1], 1) if etud["short"][1] != None else ( (etud["short"][2], 2) if etud["short"][2] != None else (etud["short"][3], 3) ) ) firstdept = cache["sem"][firstsem]["departement"]["acronym"] if "old" in etud: yearold = cache["sem"][etud["oldsem"]]["annee_scolaire"] etud["nickshort"][firstyear - 1] = etud["old"] # yy = yearold # delta = firstyear + Options.base_year - yy - 2 # for i in range(delta, firstyear - 1): # etud["nickshort"][i] = etud["nickshort"][firstyear - 1] + "*" * ( # firstyear - 1 - i # ) else: if ( str(firstyear * 2 - 2) in etud["cursus"] and etud["cursus"][str(firstyear * 2 - 2)] is not None ): startsem = str(firstyear * 2 - 2) else: startsem = str(firstyear * 2 - 1) department = etud["pseudodept"][startsem] diplome = etud["diplome"][startsem] rank = etud["rank"][startsem] modalite = etud["modalite"][startsem] parcours = etud["parcours"][startsem] nick = "EXT" + conf_value("nick") nick = nick_replace(department, diplome, rank, modalite, parcours, nick) displayname = conf_value("extnick") displayname = nick_replace( department, diplome, rank, modalite, parcours, displayname ) displaynames[nick] = displayname etud["nickshort"][firstyear - 1] = nick # to get a better ordering in sankeymatic, false nodes ere required # This is no more the case when building our own graphics # for i in range(0, firstyear - 1): # etud["nickshort"][i] = nick + "*" * (firstyear - 1 - i) entries[ddd] += 1 class Filter: # Filter on students to be considered # 1 consider only technological baccalaureates, statistics are always asked # 2 consider only women, because gender statistics are frequently asked # 4 consider only incoming students (primo-entrants) in first year of the cohort # 8 consider only people having a first year, not parallel entries TECHNO = 1 WOMAN = 2 PRIMO = 4 MAIN = 8 def bags_from_students(student, filter=0): bags = [] for etudid in student.keys(): # Filter # 0 No filter # 1 Keep only technological baccalaureate # 2 Keep only women # ... to be completed if filter & Filter.TECHNO: if student[etudid]["bac"][:2] != "ST": continue parc = student[etudid]["nickshort"] for i in range(len(parc) - 1): while len(bags) <= i: bags.append({}) nstart = parc[i] nend = parc[i + 1] if nstart != None and nend != None: if nstart not in bags[i]: bags[i][nstart] = {} if nend not in bags[i][nstart]: bags[i][nstart][nend] = 1 else: bags[i][nstart][nend] += 1 return bags def node_color(x): color = colors["NORMAL"] if x[0:4] == "FAIL": color = f"{colors['FAIL']} <<" elif x[0:4] == "+DUT": color = f"{colors['+DUT']} <<" elif x[0:4] == "QUIT": color = f"{colors['QUIT']} <<" elif x[0:3] == "RED": color = f"{colors['RED']} <<" elif x[0:4] == "DIPL": color = f"{colors['SUCCESS']} <<" elif x[0:3] == "EXT": color = f"{colors['NEW']} >>" elif x[0:3] == "BUT": color = f"{colors['NORMAL']}" elif x[0:3] == "DUT": color = f"{colors['OLD']} >>" if x[-1] == "*": color = f"{colors['TRANSPARENT']} >>" return color def textwidth(text, font="Arial", fontsize=14): try: import cairo except: return len(text) * fontsize surface = cairo.SVGSurface("undefined.svg", 1280, 200) cr = cairo.Context(surface) cr.select_font_face(font, cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD) cr.set_font_size(fontsize) xbearing, ybearing, width, height, xadvance, yadvance = cr.text_extents(text) return width def crossweight(node_position, node_layer, edges): w = 0 for e in edges: for ee in edges: if node_layer[e[0]] != node_layer[ee[0]]: continue if node_layer[e[1]] != node_layer[ee[1]]: continue if (node_position[e[0]] - node_position[ee[0]]) * ( node_position[e[1]] - node_position[ee[1]] ) < 0: w += e[2] * ee[2] return w def genetic_optimize(node_position, node_layer, edges, loops=1): debug(f"Begin genetic optimization with {loops} loops") oldcandidates = [] l_indices = list(range(5)) lays = [] randomness_l = [] for index in range(5): lays.append([x for x in node_layer.keys() if node_layer[x] == index]) if len(lays[index]) > 1: for i in lays[index]: randomness_l.append(index) w = crossweight(node_position, node_layer, edges) for i in range(20): oldcandidates.append([node_position.copy(), w]) w = crossweight(node_position, node_layer, edges) for i in range(10): n = node_position.copy() l_idx = random.choice(randomness_l) q = lays[l_idx].copy() k = 0 while len(q) > 0: nn = random.choice(q) q.remove(nn) n[nn] = k k += 1 oldcandidates.append([n, w]) candidates = oldcandidates for i in range(loops): oldcandidates = candidates oldcandidates.sort(key=lambda x: x[1]) candidates = oldcandidates[:30] while len(candidates) < 60: # mutate some random candidate candidate = random.choice(candidates)[0] new_position = candidate.copy() # Copier la position pour la muter l_idx = random.choice(randomness_l) swapa = random.choice(lays[l_idx]) swapb = random.choice(lays[l_idx]) while swapa == swapb: swapb = random.choice(lays[l_idx]) tmp = new_position[swapa] new_position[swapa] = new_position[swapb] new_position[swapb] = tmp w = crossweight(new_position, node_layer, edges) candidates.append([new_position, w]) while len(candidates) < 90: # mutate some random candidate candidate = random.choice(candidates)[0] new_position = candidate.copy() # Copier la position pour la muter l_idx = random.choice(randomness_l) startidx = random.randrange(len(lays[l_idx]) - 1) stopidx = random.randrange(startidx + 1, len(lays[l_idx])) for n in lays[l_idx]: if new_position[n] >= startidx and new_position[n] < stopidx: new_position[n] += 1 elif new_position[n] == stopidx: new_position[n] = startidx w = crossweight(new_position, node_layer, edges) candidates.append([new_position, w]) while len(candidates) < 100: # mutate some random candidate candidate = random.choice(candidates)[0] candidate2 = random.choice(candidates)[0] new_position = candidate.copy() # Copier la position pour la muter l_idx = random.choice(randomness_l) for n in lays[l_idx]: new_position[n] = candidate2[n] w = crossweight(new_position, node_layer, edges) candidates.append([new_position, w]) candidates.sort(key=lambda x: x[1]) orders = [] best = candidates[0][0] for i in range(5): b = lays[i].copy() b.sort(key=lambda x: best[x]) orders.append(b) return orders def ordernodes(layers, orders, edges): node_position = {} node_layer = {} newls = [[], [], [], [], []] if orders != {}: for i in range(len(newls)): ls = newls[i] for node in orders[i]: if node in layers[i]: ls.append(node) for node in layers[i]: if node not in ls: ls.append(node) for layer, layernodes in enumerate(newls): for j, n in enumerate(layernodes): node_position[n] = j node_layer[n] = layer else: for layer, layernodes in enumerate(layers): for j, n in enumerate(layernodes): node_position[n] = j node_layer[n] = layer debug(crossweight(node_position, node_layer, edges)) return node_position, node_layer, newls def get_layer_structure(newls, node_structure, spacing, hmargin, height): layer_structure = [] density = [] for i in range(5): ls = {} ls["olayer"] = newls[i] ls["num"] = len(newls[i]) ls["inout"] = 0 for j in ls["olayer"]: lhi = 0 lho = 0 slhi = 0 slho = 0 k = node_structure[j] for prev_node in k["prev"]: lhi += prev_node[1] slhi += prev_node[2] for next_node in k["next"]: lho += next_node[1] slho += next_node[2] k["size"] = max(lhi, lho) k["ssize"] = max(slhi, slho) k["in"] = lhi k["out"] = lho if lhi != lho and lhi * lho != 0: warning(f"BUG1: {j} {k} {lhi} {lho}") ls["inout"] += k["size"] layer_structure.append(ls) if height == 0: minheight = 0 for i in range(5): ls = layer_structure[i] new_minheight = spacing * (ls["num"] - 1) + 2 * hmargin + 2 * ls["inout"] if new_minheight > minheight: minheight = new_minheight height = (1 + (minheight // 150)) * 150 for i in range(5): ls = layer_structure[i] ls["density"] = ls["inout"] / ( spacing + height - spacing * ls["num"] - 2 * hmargin ) density.append(ls["density"]) realdensity = max(density) for i in range(5): ls = layer_structure[i] supp_spacing = ( spacing + height - 2 * hmargin - spacing * ls["num"] - ls["inout"] / realdensity ) / (ls["num"] + 1) cs = hmargin - spacing for j in ls["olayer"]: ns = node_structure[j] ns["top"] = supp_spacing + spacing + cs h = ns["size"] / realdensity sh = ns["ssize"] / realdensity ns["middle"] = ns["top"] + sh cs = ns["bottom"] = ns["top"] + h return realdensity, height, layer_structure def nodestructure_from_bags(bags, sbags=None): node_structure = {} layers = [[], [], [], [], []] edges = [] union_sbag = {} if sbags is not None: for layer, layernodes in enumerate(sbags): for startnode in layernodes: for endnode in layernodes[startnode]: if startnode not in union_sbag: union_sbag[startnode] = {} union_sbag[startnode][endnode] = layernodes[startnode][endnode] for layer, layernodes in enumerate(bags): for startnode in layernodes: # if startnode[-1] == "*": # continue for endnode in layernodes[startnode]: # if endnode[-1] == "*": # continue weight = layernodes[startnode][endnode] if startnode in union_sbag and endnode in union_sbag[startnode]: sweight = union_sbag[startnode][endnode] elif sbags is None: sweight = -1 else: sweight = 0 if endnode not in node_structure: node_structure[endnode] = { "prev": [[startnode, weight, sweight]], "next": [], "layer": layer + 1, } layers[layer + 1].append(endnode) else: node_structure[endnode]["prev"].append([startnode, weight, sweight]) if startnode not in node_structure: node_structure[startnode] = { "prev": [], "next": [[endnode, weight, sweight]], "layer": layer, } layers[layer].append(startnode) else: node_structure[startnode]["next"].append([endnode, weight, sweight]) edges.append([startnode, endnode, weight]) return node_structure, layers, edges def compute_svg(height, padding, realdensity, node_structure): unit_ratio = 96 / 72 thickness = conf_value("thickness") fontsize_name = conf_value("fontsize_name") fontsize_count = conf_value("fontsize_count") width = conf_value("width") columns = [] l = 0 for i in range(5): l += width / 6 columns.append(l) d = drawsvg.Drawing(width, height, origin=(0, 0), id_prefix=orderkey) g1 = drawsvg.Group() # rectangles representing the student-sets g2 = drawsvg.Group() # curves representing the transitions g3 = drawsvg.Group() # texts (captions of each rectangle) g4 = drawsvg.Group() # white rectangles below texts (for readability) font_offset = max(fontsize_count, fontsize_name) for n in node_structure: ns = node_structure[n] col = node_color(n).split(" ")[0].split(".")[0] ns["color"] = col xpos = width / 6 * (ns["layer"] + 1) r = drawsvg.Rectangle( xpos - thickness, ns["top"], 2 * thickness, ns["bottom"] - ns["top"], fill=col, opacity=0.5, stroke_width=0.1, stroke="#808080", ) g1.append(r) r = drawsvg.Rectangle( xpos - thickness, ns["top"], 2 * thickness, ns["middle"] - ns["top"], fill=col, stroke_width=0.2, stroke="black", ) g1.append(r) nw = textwidth(displaynames[n], "Arial", fontsize_name) * unit_ratio cw = textwidth(str(ns["size"]), "Arial", fontsize_count) * unit_ratio gw = nw + cw + padding ggw = gw + 2 * padding nxpos = xpos - gw / 2 + cw + padding + nw / 2 ypos = (ns["top"] + ns["bottom"]) / 2 cxpos = cw / 2 - gw / 2 + xpos rxpos = xpos if ns["in"] == 0: nxpos -= gw / 2 + padding + thickness cxpos -= gw / 2 + padding + thickness rxpos -= gw / 2 + padding + thickness if ns["out"] == 0: nxpos += gw / 2 + padding + thickness cxpos += gw / 2 + padding + thickness rxpos += gw / 2 + padding + thickness t = drawsvg.Text( displaynames[n], str(fontsize_name) + "pt", nxpos, ypos + fontsize_name / 2, fill="black", text_anchor="middle", font_family="Arial", ) tt = drawsvg.Text( str(ns["size"]), str(fontsize_count) + "pt", cxpos, ypos + fontsize_count / 2, fill="black", text_anchor="middle", font_family="Arial", ) g3.append(t) g3.append(tt) g4.append( drawsvg.Rectangle( rxpos - gw / 2 - padding, ypos - font_offset / 2 - padding, ggw, font_offset + 2 * padding, stroke="black", stroke_width=0, fill="white", fill_opacity=".5", rx=padding, ry=padding, ) ) for n in node_structure: ns = node_structure[n] ns["prev"].sort(key=lambda x: node_structure[x[0]]["top"]) ns["next"].sort(key=lambda x: node_structure[x[0]]["top"]) start = ns["top"] for link in ns["prev"]: ysize = link[1] sysize = link[2] link.append(start) link.append(start + sysize / realdensity) start += ysize / realdensity link.append(start) for n in node_structure: ns = node_structure[n] start = ns["top"] for link in ns["next"]: ysize = link[1] sysize = link[2] link.append(start) link.append(start + sysize / realdensity) start += ysize / realdensity link.append(start) targets = node_structure[link[0]] target = None for t in targets["prev"]: if t[0] == n: target = t if target == None: print(f"BUG: {n},{ns},{t}") sys.exit(5) # At this point, link has values target_name, size, secondary_size, top, middle, bottom posxa = columns[ns["layer"]] + thickness posxb = columns[targets["layer"]] - thickness posxc = (3 * posxa + posxb) / 4 posxd = (posxa + 3 * posxb) / 4 grad = drawsvg.LinearGradient(posxa, 0, posxb, 0) grad.add_stop(0, ns["color"], opacity=0.5) grad.add_stop(1, targets["color"], opacity=0.5) posyat = link[3] posyam = link[4] posyab = link[5] posybt = target[3] posybm = target[4] posybb = target[5] p = drawsvg.Path(fill=grad, stroke="#000", stroke_width=0, opacity=0.8) p.M(posxa, posyab) p.C(posxc, posyab, posxd, posybb, posxb, posybb) p.L(posxb, posybt) p.C(posxd, posybt, posxc, posyat, posxa, posyat) p.Z() p.append_title( f"{displaynames[n]}=>{displaynames[link[0]]}: {link[2]}/{link[1]}" ) g2.append(p) p = drawsvg.Path(fill=grad, stroke="#000", stroke_width=0) p.M(posxa, posyam) p.C(posxc, posyam, posxd, posybm, posxb, posybm) p.L(posxb, posybt) p.C(posxd, posybt, posxc, posyat, posxa, posyat) p.Z() p.append_title( f"{displaynames[n]}=>{displaynames[link[0]]}: {link[2]}/{link[1]}" ) g2.append(p) d.append(g2) d.append(g1) d.append(g4) d.append(g3) return d def printsvg(): padding = 4 spacing = conf_value("spacing") height = conf_value("height") hmargin = conf_value("hmargin") bags = bags_from_students(student, conf_value("main_filter")) sbags = bags_from_students(student, conf_value("secondary_filter")) node_structure, layers, edges = nodestructure_from_bags(bags, sbags) filename = "best-" + orderkey if Options.restart: try: os.remove(filename) except OSError: pass if Options.optimize >= 0: lastorders = read_conf(filename) else: lastorders = {} node_position, node_layer, newls = ordernodes(layers, lastorders, edges) if Options.optimize != 0: orders = genetic_optimize( node_position, node_layer, edges, loops=abs(Options.optimize) ) else: orders = newls write_conf("best-" + orderkey, orders) node_position, node_layer, newls = ordernodes(layers, orders, edges) realdensity, height, layer_structure = get_layer_structure( newls, node_structure, spacing, hmargin, height ) d = compute_svg(height, padding, realdensity, node_structure) d.save_svg(orderkey + ".svg") printsvg() for x in strangecases: print(f"// {x}")