#!/usr/bin/env python3 import requests from requests.auth import HTTPBasicAuth import csv, os, sys import json from datetime import datetime import re import drawsvg debug = True # Not used techno = False # global flag blocking = True # Die if csv is incorrect def blockordie(): if blocking: sys.exit(2) # Read args from the command line # then read config from {orderkey}.json depts = [] orderkey = "" def cli_check(): global techno global orderkey global depts index = 1 if sys.argv[index][:2] == "--": if sys.argv[index] == "--techno": techno = True else: print(f"{sys.argv[0]} [--techno] DEPT [DEPT] ...") sys.exit(0) index += 1 orderkey = "_".join(sys.argv[index:]) depts = sys.argv[index:] if len(depts) == 0: print(f"{sys.argv[0]} [--techno] DEPT [DEPT] ...") sys.exit(0) cli_check() def read_conf(key): if os.path.exists(f"{key}.json"): with open(f"{key}.json", "r") as f: return json.load(f) return {} def write_conf(key, obj): with open(f"{key}.json", "w") as f: return json.dump(obj, f) return {} conf = read_conf(orderkey) # Manage default values def conf_value(xkey: str): defaults = { "spacing": 14, "thickness": 6, "fontsize_name": 10, "fontsize_count": 14, "width": 1300, "height": 900, "hmargin": 20, "parcours_separator": "/", "year_separator": "", "rank_separator": "", "diplome_separator": "", } if xkey in conf: return conf[xkey] if xkey in defaults: return defaults[xkey] if xkey[-9:] == "separator": return " " if xkey == "nick": return "{diplome}{rank}{multidepartment}{modalite}{parcours}" if xkey == "extnick": return "{rank}{multidepartment}{diplomenobut}{modaliteshort}" if xkey == "orders": return [[], [], [], [], []] return {} # READ username, password, server, baseyear from secrets file # This file should be kept really safe # Currently, only baseyear = 2021 has been tested server, username, password, baseyear = "", "", "", 0 def read_secrets(filename): keys = ["server", "username", "password", "baseyear"] integers = ["baseyear"] if os.path.exists(filename): with open(filename, newline="") as csvfile: csvreader = csv.reader(csvfile, delimiter=",", quotechar='"') found = 0 for row in csvreader: for p, k in enumerate(keys): if row[0] == k: if k in integers: globals()[k] = int(row[1]) else: globals()[k] = row[1] found |= 0x1 << p if found != 0xF: print(f'Des paramètres manquent dans "{filename}" ({found}).') for p, k in enumerate(keys): if found & (0x1 << p) == 0: if k in globals(): g = '"' + globals()[k] + '"' else: g = None print(f"{k} = {g}") sys.exit(1) read_secrets(".env") student = {} CACHE_FILE = "cache.json" def load_cache(cache_file): if os.path.exists(cache_file): with open(cache_file, "r") as f: return json.load(f) return {} def save_cache(cache, file=None): if file == None: file = CACHE_FILE with open(CACHE_FILE, "w") as f: json.dump(cache, f) cache = load_cache(CACHE_FILE) # Read color theme # There are default color values, so may be it should just join the conf.json file def read_theme(): if os.path.exists("theme.csv"): with open("theme.csv", newline="") as csvfile: csvreader = csv.reader(csvfile, delimiter=",", quotechar='"') for row in csvreader: if len(row) == 0: continue elif len(row[0]) == 0: print("Wrong line in theme : " + str(row)) blockordie() elif row[0][0] == "#": continue else: colors[row[0]] = row[1] colors = { "+DUT": "#0040C0", "QUIT": "#00FF00", "SUCCESS": "#0000FF", "NORMAL": "#C0C0C0", "FAIL": "#FF4040", "OLD": "#FF8000", "NEW": "#FFFF00", "TRANSPARENT": "#FFFFFF.0", "RED": "#000000", } read_theme() # Read redirects # Only one file, since various combinations including same departments should # use the same redirections ("no jury yet but almost sure it will be ...") def read_redirects(): if os.path.exists("redirect.csv"): with open("redirect.csv", newline="") as csvfile: csvreader = csv.reader(csvfile, delimiter=",", quotechar='"') for row in csvreader: if len(row) == 0: continue elif len(row[0]) == 0: print("Wrong line in redirect : " + str(row)) blockordie() elif row[0][0] == "#": continue else: redirects[int(row[0])] = row[1] redirects = {} read_redirects() # Gestion globale d'un jeton pour l'API token = None def get_json(url: str, params=None): print(f"Requesting {url}") global token if token == None: url_token = f"{server}/api/tokens" response = requests.post(url_token, auth=HTTPBasicAuth(username, password)) if response.status_code == 200: token = response.json().get("token") else: print( f"Erreur de récupération de token: {response.status_code} - {response.text}" ) sys.exit(1) headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: # Afficher la réponse JSON return response.json() else: print(f"Erreur avec {url}: {response.status_code} - {response.text}") sys.exit(1) formsem_dept = {} formsem_department = {} def get_formsem_from_dept(dept): if "formsems" in cache and dept in cache["formsems"]: return cache["formsems"][dept] if "formsems" not in cache: cache["formsems"] = {} if "sem" not in cache: cache["sem"] = {} query_url = f"{server}{dept}/api/formsemestres/query" formsemestres = get_json(query_url) result = [] for sem in formsemestres: semid = str(sem["formsemestre_id"]) formsem_dept[semid] = dept cache["sem"][semid] = sem result.append(semid) cache["formsems"][dept] = result save_cache(cache) return result def get_formations_from_dept(dept): global server if "formations" in cache and dept in cache["formations"]: return cache["formations"][dept] if "formations" not in cache: cache["formations"] = {} query_url = f"{server}{dept}/api/formations" formations = get_json(query_url) result = [] for f in formations: if f["type_parcours"] == 700: result.append(f["formation_id"]) cache["formations"][dept] = result save_cache(cache) return result def get_etuds_from_formsem(dept, semid): if type(semid) == type(0): semid = str(semid) if "etudlist" in cache and semid in cache["etudlist"]: return cache["etudlist"][semid] if "etudlist" not in cache: cache["etudlist"] = {} query_url = f"{server}{dept}/api/formsemestre/{semid}/etudiants/long" result = get_json(query_url) cache["etudlist"][semid] = result save_cache(cache) return result def get_jury_from_formsem(dept, semid): if type(semid) == type(0): semid = str(semid) if "semjury" in cache and semid in cache["semjury"]: return cache["semjury"][semid] if "semjury" not in cache: cache["semjury"] = {} # query_url = f"{server}{dept}/Scolarite/Notes/formsemestre_recapcomplet?formsemestre_id={semid}&mode_jury=1&tabformat=json" query_url = f"{server}{dept}/api/formsemestre/{semid}/decisions_jury" result = get_json(query_url) cache["semjury"][semid] = result save_cache(cache) return result def get_override(sem, xkey, default=None): overrides = conf_value("override") for j in ["titre_num", "titre", "session_id"]: if ( j in sem and j in overrides and sem[j] in overrides[j] and xkey in overrides[j][sem[j]] ): return overrides[j][sem[j]][xkey] return default def analyse_student(semobj, etud, year=None): """Returns the final (department,diplome,rank,modalite,parcours,nickname) tuple from etudid in semid, taking into accounts overrides.""" session_id = semobj["session_id"].split("-") department = session_id[0] diplome = session_id[1] modalite = session_id[2] if year == None: if semobj["semestre_id"] < 0: rank = 1 else: rank = (semobj["semestre_id"] + 1) // 2 else: rank = year parcours = None groups = [] if "groups" in etud: for x in etud["groups"]: if x["partition_name"] == "Parcours": parcours = x["group_name"] groups.append(x["group_name"]) if parcours == None: parcours = "" parcours = get_override(semobj, "parcours", parcours) department = get_override(semobj, "department", department) rank = get_override(semobj, "rank", rank) diplome = get_override(semobj, "diplome", diplome) modalite = get_override(semobj, "modalite", modalite) if len(modalite) > 0 and modalite[0] == "G": goal = modalite.split(":")[1:] modalite = None for g in goal: gg = g.split("=") # print(f"Looking for {gg[0]} yielding {gg[1]} out of {groupes}") if gg[0] in groups: modalite = gg[1] nick = conf_value("nick") if len(department) > 0: nick = nick.replace( "{department}", conf_value("department_separator") + department ) else: nick = nick.replace("{department}", "") if len(department) > 0 and len(depts) > 1: nick = nick.replace( "{multidepartment}", conf_value("department_separator") + department ) else: nick = nick.replace("{multidepartment}", "") if len(diplome) > 0: nick = nick.replace("{diplome}", conf_value("diplome_separator") + diplome) else: nick = nick.replace("{diplome}", "") if len(str(rank)) > 0: nick = nick.replace("{rank}", conf_value("rank_separator") + str(rank)) else: nick = nick.replace("{rank}", "") if len(modalite) > 0: nick = nick.replace("{modalite}", conf_value("modalite_separator") + modalite) else: nick = nick.replace("{modalite}", "") if len(parcours) > 0: nick = nick.replace("{parcours}", conf_value("parcours_separator") + parcours) else: nick = nick.replace("{parcours}", "") formsem_department[str(semobj["id"])] = department if nick == " BUT 1 GEA EXT": print(department, diplome, rank, modalite, parcours, nick) sys.exit(0) return department, diplome, rank, modalite, parcours, nick def nick(semobj, etud): department, diplome, rank, modalite, parcours, nick = analyse_student(semobj, etud) return nick def get_dept_from_sem(semid): return formsem_department[str(semid)] oldsems = set() oldsemsdept = {} futuresems = set() futuresemsdept = {} bacs = set() cohort_nip = set() def analyse_depts(): for dept in depts: formsems = get_formsem_from_dept(dept) for semid in formsems: # Check if this is a part of the cohort # or a future/old semester sem = cache["sem"][str(semid)] if sem["semestre_id"] < 0: year = 1 else: year = (sem["semestre_id"] + 1) // 2 offset = sem["annee_scolaire"] - baseyear - year + 1 if offset < 0 and offset > -4: oldsems.add(str(semid)) oldsemsdept[semid] = dept if offset > 0 and offset < 4: futuresems.add(str(semid)) futuresemsdept[semid] = dept if offset != 0: continue if sem["formation"]["type_parcours"] != 700: continue if sem["modalite"] == "EXT": continue # This is a BUT semester, part of the cohort # 0,1 : preceding year ; 2-7 : cohort ; 8+ : future if sem["semestre_id"] < 0: bucket = 1 else: bucket = str(int(sem["semestre_id"] - 1)) # Ici, le semestre est donc un semestre intéressant # On prélève tous les étudiants, et on remplit leur cursus etuds = get_etuds_from_formsem(dept, semid) jurys = get_jury_from_formsem(dept, semid) key = sem["titre_num"] for etud in etuds: etudid = etud["id"] if etudid in student: studentsummary = student[etudid] else: studentsummary = {} studentsummary["cursus"] = {} # Cursus is semid studentsummary["etudid"] = {} # useful when merging students studentsummary["pseudodept"] = {} # pseudo-dept for interdept studentsummary["diplome"] = {} # diplome name studentsummary["rank"] = {} # rank studentsummary["modalite"] = {} # modalite studentsummary["parcours"] = {} # parcours studentsummary["nickname"] = {} # nick studentsummary["dept"] = dept # useful when merging students studentsummary["bac"] = "" # usually department, diplome, rank, modalite, parcours, nick = analyse_student( sem, etud, year ) if "bac" in etud["admission"]: studentsummary["bac"] = etud["admission"]["bac"] else: studentsummary["bac"] = "INCONNU" bacs.add(studentsummary["bac"]) # We skip non-techno students if we are in techno mode # If we want a mixed reporting, maybe we should change this if techno and studentsummary["bac"][:2] != "ST": # TODO: change this continue if bucket in studentsummary["cursus"]: semestreerreur = int(bucket) + 1 print( f"// Élève {etudid} dans deux semestres à la fois : S{semestreerreur}, semestres {studentsummary['cursus'][bucket]} et {semid}" ) if "dept" in studentsummary and studentsummary["dept"] != dept: print( f"// Élève ayant changé de département {dept},{studentsummary['dept']}" ) # department, diplome, rank, modalite, parcours, nick = analyse_student( studentsummary["cursus"][bucket] = semid studentsummary["etudid"][bucket] = etudid studentsummary["pseudodept"][bucket] = department studentsummary["diplome"][bucket] = diplome studentsummary["rank"][bucket] = rank studentsummary["modalite"][bucket] = modalite studentsummary["parcours"][bucket] = parcours studentsummary["nickname"][bucket] = nick studentsummary["debug"] = etud["sort_key"] # TODO: REMOVE studentsummary["unid"] = etud["code_nip"] cohort_nip.add(etud["code_nip"]) student[etudid] = studentsummary analyse_depts() def allseeingodin(): """This function changes the student lists by peeking in the past and the future to know which students come from another cohort or go into a later cohort.""" oldstudents = {} oldstudentslevel = {} futurestudents = {} futurestudentslevel = {} # We look for the latest "old semester" in which every (old) student went for semid in oldsems: sem = cache["sem"][semid] semlevel = sem["semestre_id"] semlevel = abs(semlevel) dept = oldsemsdept[semid] etuds = get_etuds_from_formsem(dept, semid) for etud in etuds: nip = etud["code_nip"] if nip not in cohort_nip: continue if nip not in oldstudentslevel or semlevel > oldstudentslevel[nip]: oldstudentslevel[nip] = semlevel oldstudents[nip] = [semid, nick(sem, etud)] for semid in futuresems: sem = cache["sem"][semid] if sem["formation"]["type_parcours"] != 700: # We are only interested in BUT continuations (for now) continue semlevel = sem["semestre_id"] semlevel = abs(semlevel) dept = futuresemsdept[semid] etuds = get_etuds_from_formsem(dept, semid) for etud in etuds: nip = etud["code_nip"] if nip not in cohort_nip: continue if nip not in futurestudentslevel or semlevel > futurestudentslevel[nip]: futurestudentslevel[nip] = semlevel futurestudents[nip] = nick(sem, etud) unification = {} duplicates = {} for etudid in student.keys(): unid = student[etudid]["unid"] if unid in unification: if unid not in duplicates: duplicates[unid] = [unification[unid]] duplicates[unid].append(etudid) unification[unid] = etudid if unid in oldstudents: student[etudid]["old"] = oldstudents[unid][1] student[etudid]["oldsem"] = oldstudents[unid][0] if unid in futurestudents: student[etudid]["future"] = futurestudents[unid] for unid in duplicates: lastsem = -1 best = [] for suppidx in duplicates[unid][1:]: supp = student[suppidx] if str(lastsem) in supp["cursus"]: best.append(suppidx) for sem in range(5, lastsem, -1): if str(sem) in supp["cursus"]: lastsem = sem best = [suppidx] break if len(best) > 1: print(f"// Error: cannot chose last semester for NIP {unid}: ") print(repr(best)) for x in best: print(cache["sem"][str(x)]) sys.exit(6) bestid = best[0] base = student[bestid] for suppidx in duplicates[unid]: if suppidx == bestid: continue supp = student[suppidx] for skey in ( "cursus", "etudid", "pseudodept", "diplome", "rank", "modalite", "parcours", "nickname", "old", "oldsem", ): if skey in supp: for bucket in supp[skey]: if bucket not in base[skey]: base[skey][bucket] = supp[skey][bucket] del student[suppidx] allseeingodin() strange_cases = [] next = {} nextnick = {} for etudid in student.keys(): etud = student[etudid] cursus_array = [None] * 6 nickname_array = [None] * 6 etudid_array = [None] * 6 for i in range(6): if str(i) in etud["cursus"]: cursus_array[i] = etud["cursus"][str(i)] nickname_array[i] = etud["nickname"][str(i)] etudid_array[i] = etud["etudid"][str(i)] # On va réduire aux semestres pairs, on cherche donc la continuation la plus habituelle pour # les élèves qui s'arrêtent sur un semestre impair for i in range(0, 5, 2): currs = str(cursus_array[i]) nexts = str(cursus_array[i + 1]) currn = str(nickname_array[i]) nextn = str(nickname_array[i + 1]) if nexts is not None: if currs not in next: next[currs] = {} if nexts not in next[currs]: next[currs][nexts] = 1 else: next[currs][nexts] += 1 if nextn is not None: if currn not in nextnick: nextnick[currn] = {} if nextn not in nextnick[currn]: nextnick[currn][nextn] = 1 else: nextnick[currn][nextn] += 1 etud["cursus_array"] = cursus_array etud["nickname_array"] = nickname_array etud["etudid_array"] = etudid_array nextbest = {} nextnickbest = {} for key in next: imax = 0 best = None for key2 in next[key]: if next[key][key2] > imax: imax = next[key][key2] best = key2 nextbest[key] = best for key in nextnick: imax = 0 best = None for key2 in nextnick[key]: if nextnick[key][key2] > imax: imax = nextnick[key][key2] best = key2 nextnickbest[key] = best evennicknames = {} for etudid in student.keys(): etud = student[etudid] for i in range(1, 6, 2): if etud["nickname_array"][i] not in evennicknames: evennicknames[etud["nickname_array"][i]] = 1 else: evennicknames[etud["nickname_array"][i]] += 1 for etudid in student.keys(): etud = student[etudid] cursus_short = [None] * 5 nickname_short = [None] * 5 etudid_short = [None] * 5 semend = None semstart = None for year in range(1, 4): sem1 = 2 * year - 2 sem2 = 2 * year - 1 finalsem = etud["cursus_array"][sem2] nick = etud["nickname_array"][sem2] etid = etud["etudid_array"][sem2] if finalsem == None: finalsem = etud["cursus_array"][sem1] nick = etud["nickname_array"][sem1] etid = etud["etudid_array"][sem1] if finalsem != None: # Abandon au premier semestre de cette année # print(f"Pour {etudid}, année {year}, abandon au S1") if nick not in evennicknames: # print( f"Pour {etudid}, année {year}, changement {nick} en {nextnickbest[nick]}" ) nick = nextnickbest[nick] if finalsem != None: cursus_short[year] = finalsem nickname_short[year] = nick etudid_short[year] = etid if etud["cursus_array"][sem1] == None: # print(f"Pour {etudid}, année {year}, saute-mouton du S1") pass etud["short"] = cursus_short etud["nickshort"] = nickname_short etud["etudidshort"] = etudid_short for etudid in student.keys(): etud = student[etudid] lastyear = 4 lastsem = None while lastsem == None: lastyear -= 1 lastsem = etud["short"][lastyear] ddd = get_dept_from_sem(lastsem) if ddd not in depts: depts.append(ddd) badred = {} goodred = {} failure = {} diploma = {} reor2 = {} reor1 = {} unknown = {} entries = {} redirs = {} for d in depts: badred[d] = 0 goodred[d] = 0 failure[d] = 0 diploma[d] = 0 reor2[d] = 0 reor1[d] = 0 unknown[d] = 0 entries[d] = 0 redirs[d] = 0 strangecases = [] for etudid in student.keys(): etud = student[etudid] lastyear = 4 lastsem = None while lastsem == None: lastyear -= 1 lastsem = etud["short"][lastyear] ddd = get_dept_from_sem(lastsem) jury = get_jury_from_formsem(None, lastsem) etudid_real = etud["etudidshort"][lastyear] if etudid_real != etudid: print(f"// Warning {etudid} {etudid_real}") resjury = None for x in jury: if x["etudid"] == etudid_real: resjury = x break if resjury == None: print(f"// No jury for {etudid} year {lastyear}") continue resultyear = None if resjury["etat"] == "D": resultyear = "DEM" if resjury["etat"] == "DEF": resultyear = "DEF" if ( "annee" in resjury and "code" in resjury["annee"] and resjury["annee"]["code"] is not None ): resultyear = resjury["annee"]["code"] finaloutput = None checkred = False if etudid in redirects: resultyear = redirects[etudid] redirs[ddd] += 1 strangecases.append( f"REDI{lastyear} {server}{ddd}/Scolarite/fiche_etud?etudid={etudid}" ) if resultyear == None: finaloutput = "?" + etud["nickshort"][lastyear] unknown[ddd] += 1 strangecases.append( f"????{lastyear} {server}{ddd}/Scolarite/fiche_etud?etudid={etudid}" ) elif resultyear in ("RAT", "ATJ"): finaloutput = "?" + etud["nickshort"][lastyear] unknown[ddd] += 1 strangecases.append( f"ATTE{lastyear} {server}{ddd}/Scolarite/fiche_etud?etudid={etudid}" ) elif resultyear in ("RED", "ABL", "ADSUP"): finaloutput = "RED " + etud["nickshort"][lastyear] checkred = True elif lastyear == 3 and resultyear in ("ADM", "ADJ"): finaloutput = "DIPLOME " + etud["nickshort"][lastyear] diploma[ddd] += 1 elif lastyear == 2 and resultyear in ("ADM", "ADJ"): finaloutput = "+DUT " + etud["nickshort"][lastyear] reor2[ddd] += 1 elif resultyear in ("PAS1NCI", "PASD"): finaloutput = "QUIT " + etud["nickshort"][lastyear] reor1[ddd] += 1 elif lastyear < 2 and resultyear in ("ADM", "ADJ"): finaloutput = "QUIT " + etud["nickshort"][lastyear] reor1[ddd] += 1 elif resultyear in ("NAR", "DEM", "DEF", "ABAN"): finaloutput = "FAIL " + etud["nickshort"][lastyear] failure[ddd] += 1 elif resjury["annee"]["annee_scolaire"] != baseyear + lastyear - 1: finaloutput = "RED " + etud["nickshort"][lastyear] checkred = True if checkred: if "future" not in etud: # print(f"// Mauvais redoublement : {etudid}") badred[ddd] += 1 finaloutput = "FAIL" + finaloutput[3:] else: goodred[ddd] += 1 etud["nickshort"][lastyear + 1] = finaloutput (firstsem, firstyear) = ( (etud["short"][1], 1) if etud["short"][1] != None else ( (etud["short"][2], 2) if etud["short"][2] != None else (etud["short"][3], 3) ) ) firstdept = cache["sem"][firstsem]["departement"]["acronym"] if "old" in etud: yearold = cache["sem"][etud["oldsem"]]["annee_scolaire"] etud["nickshort"][firstyear - 1] = etud["old"] + " " + str(yearold) yy = yearold delta = firstyear + baseyear - yy - 2 for i in range(delta, firstyear - 1): etud["nickshort"][i] = etud["nickshort"][firstyear - 1] + "*" * ( firstyear - 1 - i ) else: if ( str(firstyear * 2 - 2) in etud["cursus"] and etud["cursus"][str(firstyear * 2 - 2)] is not None ): startsem = str(firstyear * 2 - 2) else: startsem = str(firstyear * 2 - 1) department = etud["pseudodept"][startsem] diplome = etud["diplome"][startsem] rank = etud["rank"][startsem] modalite = etud["modalite"][startsem] parcours = etud["parcours"][startsem] nick = conf_value("extnick") if len(department) > 0: nick = nick.replace( "{department}", conf_value("department_separator") + department ) else: nick = nick.replace("{department}", "") if len(department) > 0 and len(depts) > 1: nick = nick.replace( "{multidepartment}", conf_value("department_separator") + department ) else: nick = nick.replace("{multidepartment}", "") if len(diplome) > 0: nick = nick.replace("{diplome}", conf_value("diplome_separator") + diplome) else: nick = nick.replace("{diplome}", "") if len(diplome) > 0 and diplome != "BUT": nick = nick.replace( "{diplomenobut}", conf_value("diplome_separator") + diplome ) else: nick = nick.replace("{diplomenobut}", "") if len(str(rank)) > 0: nick = nick.replace("{rank}", conf_value("rank_separator") + str(rank)) else: nick = nick.replace("{rank}", "") if len(modalite) > 0: nick = nick.replace( "{modalite}", conf_value("modalite_separator") + modalite ) else: nick = nick.replace("{modalite}", "") if len(modalite) > 0 and modalite != "FI": nick = nick.replace("{modaliteshort}", modalite[-1]) else: nick = nick.replace("{modaliteshort}", "") if len(parcours) > 0: nick = nick.replace( "{parcours}", conf_value("parcours_separator") + parcours ) else: nick = nick.replace("{parcours}", "") if diplome != "BUT": nick = "Ecand " + nick else: nick = "EXT" + nick etud["nickshort"][firstyear - 1] = nick for i in range(0, firstyear - 1): etud["nickshort"][i] = nick + "*" * (firstyear - 1 - i) entries[ddd] += 1 bags = [{}, {}, {}, {}] for etudid in student.keys(): parc = student[etudid]["nickshort"] previouslevels = [] for i in range(4): nstart = parc[i] nend = parc[i + 1] if nstart != None and nend != None: if nstart not in bags[i]: bags[i][nstart] = {} if nend not in bags[i][nstart]: bags[i][nstart][nend] = 1 else: bags[i][nstart][nend] += 1 # layers = [[], [], [], [], []] # alllayers = [] # flatbags = [] # for i in range(4): # for u in bags[i]: # if u not in alllayers: # alllayers.append(u) # layers[i].append(u) # for v in bags[i][u]: # if v not in alllayers: # alllayers.append(v) # layers[i + 1].append(v) # flatbags.append([u, v, bags[i][u][v]]) # allowed = [] # nextallowed = [[], [], [], [], []] # weights = {} # orders = conf_value("orders") # x = set(alllayers) # y = set() # for i in orders: # y = y.union(set(i)) # for i in range(5): # if len(orders[i]) > 0: # allowed.append(orders[i][0]) # for j in orders[i]: # if j in alllayers: # nextallowed[i].append(j) # for j, k in enumerate(orders[i]): # weights[k] = j + 1 # for u in layers[i]: # if u not in allowed and u not in nextallowed[i]: # allowed.append(u) # else: # for i in range(5): # allowed.extend(layers[i]) # for bag in flatbags: # w = 0 # if bag[0] in weights: # w += weights[bag[0]] # if bag[1] in weights: # w += weights[bag[1]] # bag.append(w) # flatbags = sorted(flatbags, key=lambda x: x[-1]) # orderedflatbags = [] # finallayers = [[], [], [], [], []] # while len(flatbags) > 0: # gotone = False # for x in flatbags: # if x[0] in allowed and x[1] in allowed: # # print(f"{x} est pris") # gotone = True # orderedflatbags.append(x) # flatbags.remove(x) # # print(f"Choosing {x}") # for i in range(5): # if x[0] in layers[i] and x[0] not in finallayers[i]: # finallayers[i].append(x[0]) # if i < 4 and x[1] in layers[i + 1] and x[1] not in finallayers[i + 1]: # finallayers[i + 1].append(x[1]) # if x[0] in nextallowed[i]: # # print(f"[{i}] Removing {x[0]} from {nextallowed[i]}") # nextallowed[i].remove(x[0]) # if x[1] in nextallowed[i]: # # print(f"[{i}] Removing {x[1]} from {nextallowed[i]}") # nextallowed[i].remove(x[1]) # # print(f"[{i}] {nextallowed[i]}") # if len(nextallowed[i]) > 0 and nextallowed[i][0] not in allowed: # # print(f"[{i}] Allowing now {nextallowed[i][0]}") # allowed.append(nextallowed[i][0]) # break # if not gotone: # print("BUG") # print(flatbags) # print("---", allowed) # print(nextallowed) # sys.exit(3) def node_color(x): color = colors["NORMAL"] if x[0:4] == "FAIL": color = f"{colors['FAIL']} <<" elif x[0:4] == "+DUT": color = f"{colors['+DUT']} <<" elif x[0:4] == "QUIT": color = f"{colors['QUIT']} <<" elif x[0:3] == "RED": color = f"{colors['RED']} <<" elif x[0:4] == "DIPL": color = f"{colors['SUCCESS']} <<" elif x[0:3] == "EXT": color = f"{colors['NEW']} >>" elif x[0:3] == "BUT": color = f"{colors['NORMAL']}" elif x[0:3] == "DUT": color = f"{colors['OLD']} >>" if x[-1] == "*": color = f"{colors['TRANSPARENT']} >>" return color # def printout(): # with open(f"sankeymatic_{orderkey}.txt", "w") as fout: # def output(*a, **b): # b["file"] = fout # print(*a, **b) # date_actuelle = datetime.now() # date_formatee = date_actuelle.strftime("%m/%d/%Y %H:%M:%S") # output( # f"// SankeyMATIC diagram inputs - Saved: {date_formatee}\n// https://sankeymatic.com/build/\n\n// === Nodes and Flows ===\n\n" # ) # output("// THEME INFO") # for c, cc in colors.items(): # output(f"// !{c}:{cc}") # output() # allnodes = [] # for y in orderedflatbags: # output(f"{y[0]} [{y[2]}] {y[1]}") # allnodes.append(y[0]) # allnodes.append(y[1]) # allnodes = list(set(allnodes)) # nodes = {} # for x in allnodes: # color = node_color(x) # if len(color): # nodes[x] = color # for u in sorted(nodes.keys()): # output(f":{u} {nodes[u]}") # height = conf_value("height") # width = conf_value("width") # output("\n\n// === Settings ===\n") # output(f"size w {width}") # output(f" h {height}") # with open("trailer.txt", "r") as fichier: # contenu = fichier.read() # output(contenu) # for ddd in depts: # if entries[ddd] == 0: # continue # p1 = round(100 * diploma[ddd] / entries[ddd]) # p2 = round(100 * (diploma[ddd] + reor2[ddd]) / entries[ddd]) # p3 = round(100 * (failure[ddd] / entries[ddd])) # p4 = round(100 * (failure[ddd] + badred[ddd] + reor1[ddd]) / entries[ddd]) # output(f"// Département {ddd}") # output(f"// {entries[ddd]} Entrées") # output(f"// {diploma[ddd]} Diplômes") # output(f"// {reor2[ddd]} DUT") # output(f"// {p1}-{p2}% de réussite") # output(f"// {goodred[ddd]} Redoublements") # output(f"// {reor1[ddd]} départs de la formation") # output(f"// {badred[ddd]} redoublements autorisés non actés") # output(f"// {failure[ddd]} échecs") # output(f"// {p3}-{p4}% d'échecs") # output(f"// {unknown[ddd]} inconnus") # for x in strangecases: # output(f"// {x}") # output(f'// orders["{orderkey}"] = {finallayers}') # output(f"// bacs: {bacs}") # printout() def textwidth(text, font="Arial", fontsize=14): try: import cairo except: return len(text) * fontsize surface = cairo.SVGSurface("undefined.svg", 1280, 200) cr = cairo.Context(surface) cr.select_font_face(font, cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD) cr.set_font_size(fontsize) xbearing, ybearing, width, height, xadvance, yadvance = cr.text_extents(text) return width def crossweight(node_position, node_layer, edges): w = 0 for e in edges: for ee in edges: if node_layer[e[0]] != node_layer[ee[0]]: continue if node_layer[e[1]] != node_layer[ee[1]]: continue if (node_position[e[0]] - node_position[ee[0]]) * ( node_position[e[1]] - node_position[ee[1]] ) < 0: w += e[2] * ee[2] return w import random def genetic_optimize(node_position, node_layer, edges): oldcandidates = [] l_indices = list(range(5)) lays = [] randomness_l = [] for index in range(5): lays.append([x for x in node_layer.keys() if node_layer[x] == index]) if len(lays[index]) > 1: for i in lays[index]: randomness_l.append(index) w = crossweight(node_position, node_layer, edges) for i in range(20): oldcandidates.append([node_position.copy(), w]) w = crossweight(node_position, node_layer, edges) for i in range(10): n = node_position.copy() l_idx = random.choice(randomness_l) q = lays[l_idx].copy() k = 0 while len(q) > 0: nn = random.choice(q) q.remove(nn) n[nn] = k k += 1 oldcandidates.append([n, w]) candidates = oldcandidates for i in range(300): oldcandidates = candidates oldcandidates.sort(key=lambda x: x[1]) candidates = oldcandidates[:30] while len(candidates) < 60: # mutate some random candidate candidate = random.choice(candidates)[0] new_position = candidate.copy() # Copier la position pour la muter l_idx = random.choice(randomness_l) swapa = random.choice(lays[l_idx]) swapb = random.choice(lays[l_idx]) while swapa == swapb: swapb = random.choice(lays[l_idx]) tmp = new_position[swapa] new_position[swapa] = new_position[swapb] new_position[swapb] = tmp w = crossweight(new_position, node_layer, edges) candidates.append([new_position, w]) while len(candidates) < 90: # mutate some random candidate candidate = random.choice(candidates)[0] new_position = candidate.copy() # Copier la position pour la muter l_idx = random.choice(randomness_l) startidx = random.randrange(len(lays[l_idx]) - 1) stopidx = random.randrange(startidx + 1, len(lays[l_idx])) for n in lays[l_idx]: if new_position[n] >= startidx and new_position[n] < stopidx: new_position[n] += 1 elif new_position[n] == stopidx: new_position[n] = startidx w = crossweight(new_position, node_layer, edges) candidates.append([new_position, w]) while len(candidates) < 100: # mutate some random candidate candidate = random.choice(candidates)[0] candidate2 = random.choice(candidates)[0] new_position = candidate.copy() # Copier la position pour la muter l_idx = random.choice(randomness_l) for n in lays[l_idx]: new_position[n] = candidate2[n] w = crossweight(new_position, node_layer, edges) candidates.append([new_position, w]) candidates.sort(key=lambda x: x[1]) orders = [] best = candidates[0][0] for i in range(5): b = lays[i].copy() b.sort(key=lambda x: best[x]) orders.append(b) print(orders) print(candidates[0][1]) return orders def printsvg(): padding = 4 unit_ratio = 96 / 72 thickness = conf_value("thickness") fontsize_name = conf_value("fontsize_name") fontsize_count = conf_value("fontsize_count") spacing = conf_value("spacing") height = conf_value("height") hmargin = conf_value("hmargin") width = conf_value("width") node_structure = {} layers = [[], [], [], [], []] edges = [] for layer, layernodes in enumerate(bags): for startnode in layernodes: if startnode[-1] == "*": continue for endnode in layernodes[startnode]: if endnode[-1] == "*": continue weight = layernodes[startnode][endnode] if endnode not in node_structure: node_structure[endnode] = { "prev": [[startnode, weight]], "next": [], "layer": layer + 1, } layers[layer + 1].append(endnode) else: node_structure[endnode]["prev"].append([startnode, weight]) if startnode not in node_structure: node_structure[startnode] = { "prev": [], "next": [[endnode, weight]], "layer": layer, } layers[layer].append(startnode) else: node_structure[startnode]["next"].append([endnode, weight]) edges.append([startnode, endnode, weight]) node_position = {} node_layer = {} layer_structure = [ {"olayer": []}, {"olayer": []}, {"olayer": []}, {"olayer": []}, {"olayer": []}, ] lastorders = read_conf("best-" + orderkey) if lastorders != {}: for i in range(5): ls = layer_structure[i] ord = lastorders[i] for node in lastorders[i]: if node in layers[i]: ls["olayer"].append(node) for node in layers[i]: if node not in ls["olayer"]: ls["olayer"].append(node) for layer, layernodes in enumerate(layer_structure): for j, n in enumerate(layernodes["olayer"]): node_position[n] = j node_layer[n] = layer print(crossweight(node_position, node_layer, edges)) else: for layer, layernodes in enumerate(layers): for j, n in enumerate(layernodes): node_position[n] = j node_layer[n] = layer orders = genetic_optimize(node_position, node_layer, edges) write_conf("best-" + orderkey, orders) layer_structure = [ {"olayer": []}, {"olayer": []}, {"olayer": []}, {"olayer": []}, {"olayer": []}, ] for i in range(5): ls = layer_structure[i] ord = orders[i] for node in orders[i]: if node in layers[i]: ls["olayer"].append(node) for node in layers[i]: if node not in ls["olayer"]: ls["olayer"].append(node) for layer, layernodes in enumerate(layer_structure): for j, n in enumerate(layernodes["olayer"]): node_position[n] = j node_layer[n] = layer print(crossweight(node_position, node_layer, edges)) density = [] for i in range(5): ls = layer_structure[i] ls["num"] = len(ls["olayer"]) ls["inout"] = 0 for j in ls["olayer"]: lhi = 0 lho = 0 k = node_structure[j] for prev_node in k["prev"]: lhi += prev_node[1] for next_node in k["next"]: lho += next_node[1] k["size"] = max(lhi, lho) k["in"] = lhi k["out"] = lho if lhi != lho and lhi * lho != 0: print(f"BUG1: {j} {k} {lhi} {lho}") ls["inout"] += k["size"] ls["density"] = ls["inout"] / ( spacing + height - spacing * ls["num"] - 2 * hmargin ) density.append(ls["density"]) realdensity = max(density) columns = [] l = 0 for i in range(5): l += width / 6 columns.append(l) for i in range(5): ls = layer_structure[i] supp_spacing = ( spacing + height - 2 * hmargin - spacing * ls["num"] - ls["inout"] / realdensity ) / (ls["num"] + 1) cs = hmargin - spacing for j in ls["olayer"]: ns = node_structure[j] ns["top"] = supp_spacing + spacing + cs h = ns["size"] / realdensity cs = ns["bottom"] = ns["top"] + h d = drawsvg.Drawing(width, height, origin=(0, 0), id_prefix=orderkey) g1 = drawsvg.Group() g2 = drawsvg.Group() g3 = drawsvg.Group() g4 = drawsvg.Group() font_offset = max(fontsize_count, fontsize_name) for n in node_structure: ns = node_structure[n] col = node_color(n).split(" ")[0].split(".")[0] ns["color"] = col xpos = width / 6 * (ns["layer"] + 1) r = drawsvg.Rectangle( xpos - thickness, ns["top"], 2 * thickness, ns["bottom"] - ns["top"], fill=col, stroke_width=0.2, stroke="black", ) g1.append(r) nw = textwidth(n, "Arial", fontsize_name) * unit_ratio cw = textwidth(str(ns["size"]), "Arial", fontsize_count) * unit_ratio gw = nw + cw + padding ggw = gw + 2 * padding nxpos = xpos - gw / 2 + cw + padding + nw / 2 ypos = (ns["top"] + ns["bottom"]) / 2 cxpos = cw / 2 - gw / 2 + xpos rxpos = xpos if ns["in"] == 0: nxpos -= gw / 2 + padding + thickness cxpos -= gw / 2 + padding + thickness rxpos -= gw / 2 + padding + thickness if ns["out"] == 0: nxpos += gw / 2 + padding + thickness cxpos += gw / 2 + padding + thickness rxpos += gw / 2 + padding + thickness t = drawsvg.Text( n, str(fontsize_name) + "pt", nxpos, ypos + fontsize_name / 2, fill="black", text_anchor="middle", font_family="Arial", ) tt = drawsvg.Text( str(ns["size"]), str(fontsize_count) + "pt", cxpos, ypos + fontsize_count / 2, fill="black", text_anchor="middle", font_family="Arial", ) g3.append(t) g3.append(tt) g4.append( drawsvg.Rectangle( rxpos - gw / 2 - padding, ypos - font_offset / 2 - padding, ggw, font_offset + 2 * padding, stroke="black", stroke_width=0, fill="white", fill_opacity=".5", rx=padding, ry=padding, ) ) for n in node_structure: ns = node_structure[n] ns["prev"].sort(key=lambda x: node_structure[x[0]]["top"]) ns["next"].sort(key=lambda x: node_structure[x[0]]["top"]) start = ns["top"] for link in ns["prev"]: ysize = link[-1] link.append(start) start += ysize / realdensity link.append(start) for n in node_structure: ns = node_structure[n] start = ns["top"] for link in ns["next"]: ysize = link[-1] link.append(start) start += ysize / realdensity link.append(start) targets = node_structure[link[0]] target = None for t in targets["prev"]: if t[0] == n: target = t if target == None: print(f"BUG: {n},{ns},{t}") sys.exit(5) posxa = columns[ns["layer"]] + thickness posxb = columns[targets["layer"]] - thickness posxc = (3 * posxa + posxb) / 4 posxd = (posxa + 3 * posxb) / 4 grad = drawsvg.LinearGradient(posxa, 0, posxb, 0) grad.add_stop(0, ns["color"], opacity=0.5) grad.add_stop(1, targets["color"], opacity=0.5) p = drawsvg.Path(fill=grad, stroke_width=0) p.M(posxa, link[-2]) p.C(posxc, link[-2], posxd, target[-2], posxb, target[-2]) p.L(posxb, target[-1]) p.C(posxd, target[-1], posxc, link[-1], posxa, link[-1]) p.Z() g2.append(p) d.append(g2) d.append(g1) d.append(g4) d.append(g3) d.save_svg(orderkey + ".svg") printsvg() for x in strangecases: print(f"// {x}")