scodoc-cohortes/get.py

1457 lines
48 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# Standard libs
import argparse
import csv
import json
import os
import pdb # used for debugging
import random
import sys
# Third-party libs
2024-10-14 01:17:25 +02:00
import requests
from requests.auth import HTTPBasicAuth
import drawsvg
try:
from dotenv import load_dotenv
except ModuleNotFoundError:
print("\nError: dotenv not installed !", file=sys.stderr)
print("You may install it using:\npip install python-dotenv\n", file=sys.stderr)
2024-10-14 01:17:25 +02:00
def die(msg:str, status=3):
print(msg, file=sys.stderr)
sys.exit(status)
2024-10-14 01:17:25 +02:00
load_dotenv(".env")
2024-10-14 01:17:25 +02:00
SCODOC_SERVER = os.environ.get("SCODOC_URL") or "http://localhost:5000"
SCODOC_USER = os.environ.get("SCODOC_USER") or die("SCODOC_USER must be set in .env or the environment")
SCODOC_PASSWORD = os.environ.get("SCODOC_PASSWORD") or die("SCODOC_PASSWORD must be set in .env or the environment")
2024-10-14 01:17:25 +02:00
API_URL=f"{SCODOC_SERVER}/ScoDoc/api"
2024-10-14 01:17:25 +02:00
# TODO : refactor globals
debug = True # Not used
BLOCKING = True # Die if csv is incorrect
2024-10-14 01:17:25 +02:00
# TODO : refactor / put globals in a class, eg Config
2024-10-14 01:17:25 +02:00
depts = []
orderkey = ""
def blockordie(status=2):
if BLOCKING:
sys.exit(status)
2024-10-14 01:17:25 +02:00
class Options:
pass
#
2024-10-14 01:17:25 +02:00
def cli_check():
"""Read args from the command line
then read config from {orderkey}.json
"""
global orderkey # TODO: globales à supprimer
2024-10-14 01:17:25 +02:00
global depts
parser = argparse.ArgumentParser(description='Process some departments.')
parser.add_argument('--techno', action='store_true', help='Enable TECHNO mode')
parser.add_argument('depts', nargs='+', help='List of departments')
parser.add_argument('--base', '-b', type=int, choices=range(2000, 2667), default=2021,
help='base year for the cohort (integer between 2000 and 2666)')
args = parser.parse_args()
Options.base_year = args.base
Options.techno = args.techno
depts = args.depts
orderkey = "_".join(depts)
2024-10-14 01:17:25 +02:00
if len(depts) == 0:
parser.print_help()
2024-10-14 01:17:25 +02:00
sys.exit(0)
def api_url(dept:str|None=None):
"""L'URL de l'API départementale.
"""
# peut être modifié ici pour n'utiliser que l'API globale
return f"{SCODOC_SERVER}/ScoDoc/{dept}/api" if dept else f"{SCODOC_SERVER}/ScoDoc/api"
2024-10-14 01:17:25 +02:00
cli_check()
def read_conf(key):
if os.path.exists(f"{key}.json"):
with open(f"{key}.json", "r") as f:
return json.load(f)
return {}
def write_conf(key, obj):
with open(f"{key}.json", "w") as f:
return json.dump(obj, f)
return {}
2024-10-14 01:17:25 +02:00
conf = read_conf(orderkey)
def conf_value(xkey: str):
"""Manage default values"""
defaults = {
"spacing": 14,
"thickness": 6,
"fontsize_name": 10,
"fontsize_count": 14,
"width": 1300,
"height": 900,
"hmargin": 20,
"parcours_separator": "/",
"year_separator": "",
"rank_separator": "",
"diplome_separator": "",
}
2024-10-14 01:17:25 +02:00
if xkey in conf:
return conf[xkey]
if xkey in defaults:
return defaults[xkey]
2024-10-14 01:17:25 +02:00
if xkey[-9:] == "separator":
return " "
if xkey == "nick":
return "{diplome}{rank}{multidepartment}{modalite}{parcours}"
2024-10-14 01:17:25 +02:00
if xkey == "extnick":
return "{rank}{multidepartment}{diplomenobut}{modaliteshort}"
2024-10-14 01:17:25 +02:00
if xkey == "orders":
return [[], [], [], [], []]
return {}
student = {}
CACHE_FILE = "cache.json"
def load_cache(cache_file):
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
return json.load(f)
return {}
def save_cache(cache, file=None):
if file == None:
file = CACHE_FILE
with open(CACHE_FILE, "w") as f:
json.dump(cache, f)
cache = load_cache(CACHE_FILE)
# Read color theme
# There are default color values, so may be it should just join the conf.json file
def read_theme():
if os.path.exists("theme.csv"):
with open("theme.csv", newline="") as csvfile:
csvreader = csv.reader(csvfile, delimiter=",", quotechar='"')
for row in csvreader:
if len(row) == 0:
continue
elif len(row[0]) == 0:
print("Wrong line in theme : " + str(row))
blockordie()
elif row[0][0] == "#":
continue
else:
colors[row[0]] = row[1]
colors = {
"+DUT": "#0040C0",
"QUIT": "#00FF00",
"SUCCESS": "#0000FF",
"NORMAL": "#C0C0C0",
"FAIL": "#FF4040",
"OLD": "#FF8000",
"NEW": "#FFFF00",
"TRANSPARENT": "#FFFFFF.0",
"RED": "#000000",
}
read_theme()
# Read redirects
# Only one file, since various combinations including same departments should
# use the same redirections ("no jury yet but almost sure it will be ...")
def read_redirects():
if os.path.exists("redirect.csv"):
with open("redirect.csv", newline="") as csvfile:
csvreader = csv.reader(csvfile, delimiter=",", quotechar='"')
for row in csvreader:
if len(row) == 0:
continue
elif len(row[0]) == 0:
print("Wrong line in redirect : " + str(row))
blockordie()
elif row[0][0] == "#":
continue
else:
redirects[int(row[0])] = row[1]
redirects = {}
read_redirects()
# Gestion globale d'un jeton pour l'API
token = None
def get_json(url: str, params=None):
print(f"Requesting {url}")
global token
if token == None:
url_token = f"{API_URL}/tokens"
response = requests.post(url_token, auth=HTTPBasicAuth(SCODOC_USER, SCODOC_PASSWORD))
2024-10-14 01:17:25 +02:00
if response.status_code == 200:
token = response.json().get("token")
else:
print(
f"Erreur de récupération de token: {response.status_code} - {response.text}"
)
sys.exit(1)
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
# Afficher la réponse JSON
return response.json()
else:
print(f"Erreur avec {url}: {response.status_code} - {response.text}")
sys.exit(1)
formsem_dept = {}
formsem_department = {}
def get_formsem_from_dept(dept):
if "formsems" in cache and dept in cache["formsems"]:
return cache["formsems"][dept]
if "formsems" not in cache:
cache["formsems"] = {}
if "sem" not in cache:
cache["sem"] = {}
query_url = f"{api_url(dept)}/formsemestres/query"
2024-10-14 01:17:25 +02:00
formsemestres = get_json(query_url)
result = []
for sem in formsemestres:
semid = str(sem["formsemestre_id"])
formsem_dept[semid] = dept
cache["sem"][semid] = sem
result.append(semid)
cache["formsems"][dept] = result
save_cache(cache)
return result
def get_formations_from_dept(dept):
if "formations" in cache and dept in cache["formations"]:
return cache["formations"][dept]
if "formations" not in cache:
cache["formations"] = {}
query_url = f"{api_url(dept)}/formations"
2024-10-14 01:17:25 +02:00
formations = get_json(query_url)
result = []
for f in formations:
if f["type_parcours"] == 700:
result.append(f["formation_id"])
cache["formations"][dept] = result
save_cache(cache)
return result
def get_etuds_from_formsem(dept, semid):
if type(semid) == type(0):
semid = str(semid)
if "etudlist" in cache and semid in cache["etudlist"]:
return cache["etudlist"][semid]
if "etudlist" not in cache:
cache["etudlist"] = {}
query_url = f"{api_url(dept)}/formsemestre/{semid}/etudiants/long"
2024-10-14 01:17:25 +02:00
result = get_json(query_url)
cache["etudlist"][semid] = result
save_cache(cache)
return result
def get_jury_from_formsem(dept:str, semid):
2024-10-14 01:17:25 +02:00
if type(semid) == type(0):
semid = str(semid)
if "semjury" in cache and semid in cache["semjury"]:
return cache["semjury"][semid]
if "semjury" not in cache:
cache["semjury"] = {}
# query_url = f"{server}{dept}/Scolarite/Notes/formsemestre_recapcomplet?formsemestre_id={semid}&mode_jury=1&tabformat=json"
query_url = f"{api_url(dept)}/formsemestre/{semid}/decisions_jury"
2024-10-14 01:17:25 +02:00
result = get_json(query_url)
cache["semjury"][semid] = result
save_cache(cache)
return result
def get_override(sem, xkey, default=None):
overrides = conf_value("override")
for j in ["titre_num", "titre", "session_id"]:
if (
j in sem
and j in overrides
and sem[j] in overrides[j]
and xkey in overrides[j][sem[j]]
):
return overrides[j][sem[j]][xkey]
return default
def analyse_student(semobj, etud, year=None):
"""Returns the final (department,diplome,rank,modalite,parcours,nickname) tuple from etudid in semid, taking into accounts overrides."""
session_id = semobj["session_id"].split("-")
department = session_id[0]
diplome = session_id[1]
modalite = session_id[2]
if year == None:
if semobj["semestre_id"] < 0:
rank = 1
else:
rank = (semobj["semestre_id"] + 1) // 2
else:
rank = year
parcours = None
groups = []
if "groups" in etud:
for x in etud["groups"]:
if x["partition_name"] == "Parcours":
parcours = x["group_name"]
groups.append(x["group_name"])
if parcours == None:
parcours = ""
parcours = get_override(semobj, "parcours", parcours)
department = get_override(semobj, "department", department)
rank = get_override(semobj, "rank", rank)
diplome = get_override(semobj, "diplome", diplome)
modalite = get_override(semobj, "modalite", modalite)
if len(modalite) > 0 and modalite[0] == "G":
goal = modalite.split(":")[1:]
modalite = None
for g in goal:
gg = g.split("=")
# print(f"Looking for {gg[0]} yielding {gg[1]} out of {groupes}")
if gg[0] in groups:
modalite = gg[1]
nick = conf_value("nick")
if len(department) > 0:
nick = nick.replace(
"{department}", conf_value("department_separator") + department
)
else:
nick = nick.replace("{department}", "")
if len(department) > 0 and len(depts) > 1:
nick = nick.replace(
"{multidepartment}", conf_value("department_separator") + department
)
else:
nick = nick.replace("{multidepartment}", "")
2024-10-14 01:17:25 +02:00
if len(diplome) > 0:
nick = nick.replace("{diplome}", conf_value("diplome_separator") + diplome)
else:
nick = nick.replace("{diplome}", "")
if len(str(rank)) > 0:
nick = nick.replace("{rank}", conf_value("rank_separator") + str(rank))
else:
nick = nick.replace("{rank}", "")
if len(modalite) > 0:
nick = nick.replace("{modalite}", conf_value("modalite_separator") + modalite)
else:
nick = nick.replace("{modalite}", "")
if len(parcours) > 0:
nick = nick.replace("{parcours}", conf_value("parcours_separator") + parcours)
else:
nick = nick.replace("{parcours}", "")
formsem_department[str(semobj["id"])] = department
if nick == " BUT 1 GEA EXT":
print(department, diplome, rank, modalite, parcours, nick)
sys.exit(0)
2024-10-14 01:17:25 +02:00
return department, diplome, rank, modalite, parcours, nick
def nick(semobj, etud):
department, diplome, rank, modalite, parcours, nick = analyse_student(semobj, etud)
return nick
def get_dept_from_sem(semid):
return formsem_department[str(semid)]
oldsems = set()
oldsemsdept = {}
futuresems = set()
futuresemsdept = {}
bacs = set()
cohort_nip = set()
def analyse_depts():
for dept in depts:
formsems = get_formsem_from_dept(dept)
for semid in formsems:
# Check if this is a part of the cohort
# or a future/old semester
sem = cache["sem"][str(semid)]
if sem["semestre_id"] < 0:
year = 1
2024-10-14 01:17:25 +02:00
else:
year = (sem["semestre_id"] + 1) // 2
offset = sem["annee_scolaire"] - Options.base_year - year + 1
if offset < 0 and offset > -4:
oldsems.add(str(semid))
oldsemsdept[semid] = dept
if offset > 0 and offset < 4:
futuresems.add(str(semid))
futuresemsdept[semid] = dept
if offset != 0:
2024-10-14 01:17:25 +02:00
continue
if sem["formation"]["type_parcours"] != 700:
continue
if sem["modalite"] == "EXT":
continue
# This is a BUT semester, part of the cohort
# 0,1 : preceding year ; 2-7 : cohort ; 8+ : future
if sem["semestre_id"] < 0:
bucket = 1
else:
bucket = str(int(sem["semestre_id"] - 1))
# Ici, le semestre est donc un semestre intéressant
# On prélève tous les étudiants, et on remplit leur cursus
etuds = get_etuds_from_formsem(dept, semid)
jurys = get_jury_from_formsem(dept, semid)
key = sem["titre_num"]
for etud in etuds:
etudid = etud["id"]
if etudid in student:
studentsummary = student[etudid]
else:
studentsummary = {}
studentsummary["cursus"] = {} # Cursus is semid
studentsummary["etudid"] = {} # useful when merging students
studentsummary["pseudodept"] = {} # pseudo-dept for interdept
studentsummary["diplome"] = {} # diplome name
studentsummary["rank"] = {} # rank
studentsummary["modalite"] = {} # modalite
studentsummary["parcours"] = {} # parcours
studentsummary["nickname"] = {} # nick
studentsummary["dept"] = dept # useful when merging students
studentsummary["bac"] = "" # usually
department, diplome, rank, modalite, parcours, nick = analyse_student(
sem, etud, year
2024-10-14 01:17:25 +02:00
)
if "bac" in etud["admission"]:
studentsummary["bac"] = etud["admission"]["bac"]
else:
studentsummary["bac"] = "INCONNU"
bacs.add(studentsummary["bac"])
# We skip non-techno students if we are in techno mode
# If we want a mixed reporting, maybe we should change this
if Options.techno and studentsummary["bac"][:2] != "ST": # TODO: change this
continue
if bucket in studentsummary["cursus"]:
semestreerreur = int(bucket) + 1
print(
f"// Élève {etudid} dans deux semestres à la fois : S{semestreerreur}, semestres {studentsummary['cursus'][bucket]} et {semid}"
)
if "dept" in studentsummary and studentsummary["dept"] != dept:
print(
f"// Élève ayant changé de département {dept},{studentsummary['dept']}"
)
# department, diplome, rank, modalite, parcours, nick = analyse_student(
studentsummary["cursus"][bucket] = semid
studentsummary["etudid"][bucket] = etudid
studentsummary["pseudodept"][bucket] = department
studentsummary["diplome"][bucket] = diplome
studentsummary["rank"][bucket] = rank
studentsummary["modalite"][bucket] = modalite
studentsummary["parcours"][bucket] = parcours
studentsummary["nickname"][bucket] = nick
studentsummary["debug"] = etud["sort_key"] # TODO: REMOVE
studentsummary["unid"] = etud["code_nip"]
cohort_nip.add(etud["code_nip"])
student[etudid] = studentsummary
2024-10-14 01:17:25 +02:00
analyse_depts()
def allseeingodin():
"""This function changes the student lists by peeking in the past and the future to know which students come from another cohort or go into a later cohort."""
oldstudents = {}
oldstudentslevel = {}
futurestudents = {}
futurestudentslevel = {}
# We look for the latest "old semester" in which every (old) student went
for semid in oldsems:
sem = cache["sem"][semid]
semlevel = sem["semestre_id"]
semlevel = abs(semlevel)
dept = oldsemsdept[semid]
etuds = get_etuds_from_formsem(dept, semid)
for etud in etuds:
nip = etud["code_nip"]
if nip not in cohort_nip:
continue
if nip not in oldstudentslevel or semlevel > oldstudentslevel[nip]:
oldstudentslevel[nip] = semlevel
oldstudents[nip] = [semid, nick(sem, etud)]
for semid in futuresems:
sem = cache["sem"][semid]
if sem["formation"]["type_parcours"] != 700:
# We are only interested in BUT continuations (for now)
continue
semlevel = sem["semestre_id"]
semlevel = abs(semlevel)
dept = futuresemsdept[semid]
etuds = get_etuds_from_formsem(dept, semid)
for etud in etuds:
nip = etud["code_nip"]
if nip not in cohort_nip:
continue
if nip not in futurestudentslevel or semlevel > futurestudentslevel[nip]:
futurestudentslevel[nip] = semlevel
futurestudents[nip] = nick(sem, etud)
unification = {}
duplicates = {}
for etudid in student.keys():
unid = student[etudid]["unid"]
if unid in unification:
if unid not in duplicates:
duplicates[unid] = [unification[unid]]
duplicates[unid].append(etudid)
unification[unid] = etudid
if unid in oldstudents:
student[etudid]["old"] = oldstudents[unid][1]
student[etudid]["oldsem"] = oldstudents[unid][0]
if unid in futurestudents:
student[etudid]["future"] = futurestudents[unid]
for unid in duplicates:
2024-10-14 01:17:25 +02:00
lastsem = -1
best = []
for suppidx in duplicates[unid][1:]:
supp = student[suppidx]
if str(lastsem) in supp["cursus"]:
best.append(suppidx)
for sem in range(5, lastsem, -1):
if str(sem) in supp["cursus"]:
lastsem = sem
best = [suppidx]
break
if len(best) > 1:
print(f"// Error: cannot chose last semester for NIP {unid}: ")
print(repr(best))
for x in best:
print(cache["sem"][str(x)])
sys.exit(6)
2024-10-14 01:17:25 +02:00
bestid = best[0]
base = student[bestid]
for suppidx in duplicates[unid]:
if suppidx == bestid:
continue
supp = student[suppidx]
for skey in (
"cursus",
"etudid",
"pseudodept",
"diplome",
"rank",
"modalite",
"parcours",
"nickname",
"old",
"oldsem",
):
if skey in supp:
for bucket in supp[skey]:
if bucket not in base[skey]:
base[skey][bucket] = supp[skey][bucket]
2024-10-14 01:17:25 +02:00
del student[suppidx]
allseeingodin()
strange_cases = []
next = {}
nextnick = {}
for etudid in student.keys():
etud = student[etudid]
cursus_array = [None] * 6
nickname_array = [None] * 6
etudid_array = [None] * 6
for i in range(6):
if str(i) in etud["cursus"]:
cursus_array[i] = etud["cursus"][str(i)]
nickname_array[i] = etud["nickname"][str(i)]
etudid_array[i] = etud["etudid"][str(i)]
# On va réduire aux semestres pairs, on cherche donc la continuation la plus habituelle pour
# les élèves qui s'arrêtent sur un semestre impair
for i in range(0, 5, 2):
currs = str(cursus_array[i])
nexts = str(cursus_array[i + 1])
currn = str(nickname_array[i])
nextn = str(nickname_array[i + 1])
if nexts is not None:
if currs not in next:
next[currs] = {}
if nexts not in next[currs]:
next[currs][nexts] = 1
else:
next[currs][nexts] += 1
if nextn is not None:
if currn not in nextnick:
nextnick[currn] = {}
if nextn not in nextnick[currn]:
nextnick[currn][nextn] = 1
else:
nextnick[currn][nextn] += 1
etud["cursus_array"] = cursus_array
etud["nickname_array"] = nickname_array
etud["etudid_array"] = etudid_array
nextbest = {}
nextnickbest = {}
for key in next:
imax = 0
2024-10-14 01:17:25 +02:00
best = None
for key2 in next[key]:
if next[key][key2] > imax:
imax = next[key][key2]
2024-10-14 01:17:25 +02:00
best = key2
nextbest[key] = best
for key in nextnick:
imax = 0
2024-10-14 01:17:25 +02:00
best = None
for key2 in nextnick[key]:
if nextnick[key][key2] > imax:
imax = nextnick[key][key2]
2024-10-14 01:17:25 +02:00
best = key2
nextnickbest[key] = best
evennicknames = {}
for etudid in student.keys():
etud = student[etudid]
for i in range(1, 6, 2):
if etud["nickname_array"][i] not in evennicknames:
evennicknames[etud["nickname_array"][i]] = 1
else:
evennicknames[etud["nickname_array"][i]] += 1
for etudid in student.keys():
etud = student[etudid]
cursus_short = [None] * 5
nickname_short = [None] * 5
etudid_short = [None] * 5
semend = None
semstart = None
for year in range(1, 4):
sem1 = 2 * year - 2
sem2 = 2 * year - 1
finalsem = etud["cursus_array"][sem2]
nick = etud["nickname_array"][sem2]
etid = etud["etudid_array"][sem2]
if finalsem == None:
finalsem = etud["cursus_array"][sem1]
nick = etud["nickname_array"][sem1]
etid = etud["etudid_array"][sem1]
if finalsem != None:
# Abandon au premier semestre de cette année
# print(f"Pour {etudid}, année {year}, abandon au S1")
if nick not in evennicknames:
# print( f"Pour {etudid}, année {year}, changement {nick} en {nextnickbest[nick]}" )
nick = nextnickbest[nick]
if finalsem != None:
cursus_short[year] = finalsem
nickname_short[year] = nick
etudid_short[year] = etid
if etud["cursus_array"][sem1] == None:
# print(f"Pour {etudid}, année {year}, saute-mouton du S1")
pass
etud["short"] = cursus_short
etud["nickshort"] = nickname_short
etud["etudidshort"] = etudid_short
for etudid in student.keys():
etud = student[etudid]
lastyear = 4
lastsem = None
while lastsem == None:
lastyear -= 1
lastsem = etud["short"][lastyear]
ddd = get_dept_from_sem(lastsem)
if ddd not in depts:
depts.append(ddd)
badred = {}
goodred = {}
failure = {}
diploma = {}
reor2 = {}
reor1 = {}
unknown = {}
entries = {}
redirs = {}
for d in depts:
badred[d] = 0
goodred[d] = 0
failure[d] = 0
diploma[d] = 0
reor2[d] = 0
reor1[d] = 0
unknown[d] = 0
entries[d] = 0
redirs[d] = 0
strangecases = []
for etudid in student.keys():
etud = student[etudid]
lastyear = 4
lastsem = None
while lastsem == None:
lastyear -= 1
lastsem = etud["short"][lastyear]
ddd = get_dept_from_sem(lastsem)
jury = get_jury_from_formsem(None, lastsem)
etudid_real = etud["etudidshort"][lastyear]
if etudid_real != etudid:
print(f"// Warning {etudid} {etudid_real}")
resjury = None
for x in jury:
if x["etudid"] == etudid_real:
resjury = x
break
if resjury == None:
print(f"// No jury for {etudid} year {lastyear}")
continue
resultyear = None
if resjury["etat"] == "D":
resultyear = "DEM"
if resjury["etat"] == "DEF":
resultyear = "DEF"
if (
"annee" in resjury
and "code" in resjury["annee"]
and resjury["annee"]["code"] is not None
):
resultyear = resjury["annee"]["code"]
finaloutput = None
checkred = False
if etudid in redirects:
resultyear = redirects[etudid]
redirs[ddd] += 1
strangecases.append(
f"REDI{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}"
2024-10-14 01:17:25 +02:00
)
if resultyear == None:
finaloutput = "?" + etud["nickshort"][lastyear]
unknown[ddd] += 1
strangecases.append(
f"????{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}"
2024-10-14 01:17:25 +02:00
)
elif resultyear in ("RAT", "ATJ"):
finaloutput = "?" + etud["nickshort"][lastyear]
unknown[ddd] += 1
strangecases.append(
f"ATTE{lastyear} {SCODOC_SERVER}/{ddd}/Scolarite/fiche_etud?etudid={etudid}"
2024-10-14 01:17:25 +02:00
)
elif resultyear in ("RED", "ABL", "ADSUP"):
finaloutput = "RED " + etud["nickshort"][lastyear]
checkred = True
elif lastyear == 3 and resultyear in ("ADM", "ADJ"):
finaloutput = "DIPLOME " + etud["nickshort"][lastyear]
diploma[ddd] += 1
elif lastyear == 2 and resultyear in ("ADM", "ADJ"):
finaloutput = "+DUT " + etud["nickshort"][lastyear]
reor2[ddd] += 1
elif resultyear in ("PAS1NCI", "PASD"):
finaloutput = "QUIT " + etud["nickshort"][lastyear]
reor1[ddd] += 1
elif lastyear < 2 and resultyear in ("ADM", "ADJ"):
finaloutput = "QUIT " + etud["nickshort"][lastyear]
reor1[ddd] += 1
elif resultyear in ("NAR", "DEM", "DEF", "ABAN"):
finaloutput = "FAIL " + etud["nickshort"][lastyear]
failure[ddd] += 1
elif resjury["annee"]["annee_scolaire"] != Options.base_year + lastyear - 1:
2024-10-14 01:17:25 +02:00
finaloutput = "RED " + etud["nickshort"][lastyear]
checkred = True
if checkred:
if "future" not in etud:
# print(f"// Mauvais redoublement : {etudid}")
badred[ddd] += 1
finaloutput = "FAIL" + finaloutput[3:]
else:
goodred[ddd] += 1
etud["nickshort"][lastyear + 1] = finaloutput
(firstsem, firstyear) = (
(etud["short"][1], 1)
if etud["short"][1] != None
else (
(etud["short"][2], 2) if etud["short"][2] != None else (etud["short"][3], 3)
)
)
firstdept = cache["sem"][firstsem]["departement"]["acronym"]
if "old" in etud:
yearold = cache["sem"][etud["oldsem"]]["annee_scolaire"]
etud["nickshort"][firstyear - 1] = etud["old"] + " " + str(yearold)
yy = yearold
delta = firstyear + Options.base_year - yy - 2
2024-10-14 01:17:25 +02:00
for i in range(delta, firstyear - 1):
etud["nickshort"][i] = etud["nickshort"][firstyear - 1] + "*" * (
firstyear - 1 - i
)
else:
if (
str(firstyear * 2 - 2) in etud["cursus"]
and etud["cursus"][str(firstyear * 2 - 2)] is not None
):
2024-10-14 01:17:25 +02:00
startsem = str(firstyear * 2 - 2)
else:
startsem = str(firstyear * 2 - 1)
department = etud["pseudodept"][startsem]
diplome = etud["diplome"][startsem]
rank = etud["rank"][startsem]
modalite = etud["modalite"][startsem]
parcours = etud["parcours"][startsem]
nick = conf_value("extnick")
if len(department) > 0:
nick = nick.replace(
"{department}", conf_value("department_separator") + department
)
else:
nick = nick.replace("{department}", "")
if len(department) > 0 and len(depts) > 1:
nick = nick.replace(
"{multidepartment}", conf_value("department_separator") + department
)
else:
nick = nick.replace("{multidepartment}", "")
2024-10-14 01:17:25 +02:00
if len(diplome) > 0:
nick = nick.replace("{diplome}", conf_value("diplome_separator") + diplome)
else:
nick = nick.replace("{diplome}", "")
if len(diplome) > 0 and diplome != "BUT":
nick = nick.replace(
"{diplomenobut}", conf_value("diplome_separator") + diplome
)
else:
nick = nick.replace("{diplomenobut}", "")
if len(str(rank)) > 0:
nick = nick.replace("{rank}", conf_value("rank_separator") + str(rank))
else:
nick = nick.replace("{rank}", "")
if len(modalite) > 0:
nick = nick.replace(
"{modalite}", conf_value("modalite_separator") + modalite
)
else:
nick = nick.replace("{modalite}", "")
if len(modalite) > 0 and modalite != "FI":
nick = nick.replace("{modaliteshort}", modalite[-1])
else:
nick = nick.replace("{modaliteshort}", "")
if len(parcours) > 0:
nick = nick.replace(
"{parcours}", conf_value("parcours_separator") + parcours
)
else:
nick = nick.replace("{parcours}", "")
if diplome != "BUT":
nick = "Ecand " + nick
else:
nick = "EXT" + nick
etud["nickshort"][firstyear - 1] = nick
for i in range(0, firstyear - 1):
etud["nickshort"][i] = nick + "*" * (firstyear - 1 - i)
entries[ddd] += 1
bags = [{}, {}, {}, {}]
for etudid in student.keys():
parc = student[etudid]["nickshort"]
previouslevels = []
for i in range(4):
nstart = parc[i]
nend = parc[i + 1]
if nstart != None and nend != None:
if nstart not in bags[i]:
bags[i][nstart] = {}
if nend not in bags[i][nstart]:
bags[i][nstart][nend] = 1
else:
bags[i][nstart][nend] += 1
# layers = [[], [], [], [], []]
# alllayers = []
# flatbags = []
# for i in range(4):
# for u in bags[i]:
# if u not in alllayers:
# alllayers.append(u)
# layers[i].append(u)
# for v in bags[i][u]:
# if v not in alllayers:
# alllayers.append(v)
# layers[i + 1].append(v)
# flatbags.append([u, v, bags[i][u][v]])
# allowed = []
# nextallowed = [[], [], [], [], []]
# weights = {}
# orders = conf_value("orders")
# x = set(alllayers)
# y = set()
# for i in orders:
# y = y.union(set(i))
# for i in range(5):
# if len(orders[i]) > 0:
# allowed.append(orders[i][0])
# for j in orders[i]:
# if j in alllayers:
# nextallowed[i].append(j)
# for j, k in enumerate(orders[i]):
# weights[k] = j + 1
# for u in layers[i]:
# if u not in allowed and u not in nextallowed[i]:
# allowed.append(u)
# else:
# for i in range(5):
# allowed.extend(layers[i])
# for bag in flatbags:
# w = 0
# if bag[0] in weights:
# w += weights[bag[0]]
# if bag[1] in weights:
# w += weights[bag[1]]
# bag.append(w)
# flatbags = sorted(flatbags, key=lambda x: x[-1])
# orderedflatbags = []
# finallayers = [[], [], [], [], []]
# while len(flatbags) > 0:
# gotone = False
# for x in flatbags:
# if x[0] in allowed and x[1] in allowed:
# # print(f"{x} est pris")
# gotone = True
# orderedflatbags.append(x)
# flatbags.remove(x)
# # print(f"Choosing {x}")
# for i in range(5):
# if x[0] in layers[i] and x[0] not in finallayers[i]:
# finallayers[i].append(x[0])
# if i < 4 and x[1] in layers[i + 1] and x[1] not in finallayers[i + 1]:
# finallayers[i + 1].append(x[1])
# if x[0] in nextallowed[i]:
# # print(f"[{i}] Removing {x[0]} from {nextallowed[i]}")
# nextallowed[i].remove(x[0])
# if x[1] in nextallowed[i]:
# # print(f"[{i}] Removing {x[1]} from {nextallowed[i]}")
# nextallowed[i].remove(x[1])
# # print(f"[{i}] {nextallowed[i]}")
# if len(nextallowed[i]) > 0 and nextallowed[i][0] not in allowed:
# # print(f"[{i}] Allowing now {nextallowed[i][0]}")
# allowed.append(nextallowed[i][0])
# break
# if not gotone:
# print("BUG")
# print(flatbags)
# print("---", allowed)
# print(nextallowed)
# sys.exit(3)
def node_color(x):
color = colors["NORMAL"]
if x[0:4] == "FAIL":
color = f"{colors['FAIL']} <<"
elif x[0:4] == "+DUT":
color = f"{colors['+DUT']} <<"
elif x[0:4] == "QUIT":
color = f"{colors['QUIT']} <<"
elif x[0:3] == "RED":
color = f"{colors['RED']} <<"
elif x[0:4] == "DIPL":
color = f"{colors['SUCCESS']} <<"
elif x[0:3] == "EXT":
color = f"{colors['NEW']} >>"
elif x[0:3] == "BUT":
color = f"{colors['NORMAL']}"
elif x[0:3] == "DUT":
color = f"{colors['OLD']} >>"
if x[-1] == "*":
color = f"{colors['TRANSPARENT']} >>"
return color
# def printout():
# with open(f"sankeymatic_{orderkey}.txt", "w") as fout:
# def output(*a, **b):
# b["file"] = fout
# print(*a, **b)
# date_actuelle = datetime.now()
# date_formatee = date_actuelle.strftime("%m/%d/%Y %H:%M:%S")
# output(
# f"// SankeyMATIC diagram inputs - Saved: {date_formatee}\n// https://sankeymatic.com/build/\n\n// === Nodes and Flows ===\n\n"
# )
# output("// THEME INFO")
# for c, cc in colors.items():
# output(f"// !{c}:{cc}")
# output()
# allnodes = []
# for y in orderedflatbags:
# output(f"{y[0]} [{y[2]}] {y[1]}")
# allnodes.append(y[0])
# allnodes.append(y[1])
# allnodes = list(set(allnodes))
# nodes = {}
# for x in allnodes:
# color = node_color(x)
# if len(color):
# nodes[x] = color
# for u in sorted(nodes.keys()):
# output(f":{u} {nodes[u]}")
# height = conf_value("height")
# width = conf_value("width")
# output("\n\n// === Settings ===\n")
# output(f"size w {width}")
# output(f" h {height}")
# with open("trailer.txt", "r") as fichier:
# contenu = fichier.read()
# output(contenu)
# for ddd in depts:
# if entries[ddd] == 0:
# continue
# p1 = round(100 * diploma[ddd] / entries[ddd])
# p2 = round(100 * (diploma[ddd] + reor2[ddd]) / entries[ddd])
# p3 = round(100 * (failure[ddd] / entries[ddd]))
# p4 = round(100 * (failure[ddd] + badred[ddd] + reor1[ddd]) / entries[ddd])
# output(f"// Département {ddd}")
# output(f"// {entries[ddd]} Entrées")
# output(f"// {diploma[ddd]} Diplômes")
# output(f"// {reor2[ddd]} DUT")
# output(f"// {p1}-{p2}% de réussite")
# output(f"// {goodred[ddd]} Redoublements")
# output(f"// {reor1[ddd]} départs de la formation")
# output(f"// {badred[ddd]} redoublements autorisés non actés")
# output(f"// {failure[ddd]} échecs")
# output(f"// {p3}-{p4}% d'échecs")
# output(f"// {unknown[ddd]} inconnus")
# for x in strangecases:
# output(f"// {x}")
# output(f'// orders["{orderkey}"] = {finallayers}')
# output(f"// bacs: {bacs}")
# printout()
def textwidth(text, font="Arial", fontsize=14):
try:
import cairo
except:
return len(text) * fontsize
surface = cairo.SVGSurface("undefined.svg", 1280, 200)
cr = cairo.Context(surface)
cr.select_font_face(font, cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD)
cr.set_font_size(fontsize)
xbearing, ybearing, width, height, xadvance, yadvance = cr.text_extents(text)
return width
def crossweight(node_position, node_layer, edges):
2024-10-14 01:17:25 +02:00
w = 0
for e in edges:
for ee in edges:
if node_layer[e[0]] != node_layer[ee[0]]:
continue
if node_layer[e[1]] != node_layer[ee[1]]:
continue
if (node_position[e[0]] - node_position[ee[0]]) * (
node_position[e[1]] - node_position[ee[1]]
) < 0:
w += e[2] * ee[2]
return w
def genetic_optimize(node_position, node_layer, edges):
oldcandidates = []
l_indices = list(range(5))
lays = []
randomness_l = []
for index in range(5):
lays.append([x for x in node_layer.keys() if node_layer[x] == index])
if len(lays[index]) > 1:
for i in lays[index]:
randomness_l.append(index)
w = crossweight(node_position, node_layer, edges)
for i in range(20):
oldcandidates.append([node_position.copy(), w])
w = crossweight(node_position, node_layer, edges)
for i in range(10):
n = node_position.copy()
l_idx = random.choice(randomness_l)
q = lays[l_idx].copy()
k = 0
while len(q) > 0:
nn = random.choice(q)
q.remove(nn)
n[nn] = k
k += 1
oldcandidates.append([n, w])
candidates = oldcandidates
for i in range(300):
oldcandidates = candidates
oldcandidates.sort(key=lambda x: x[1])
candidates = oldcandidates[:30]
while len(candidates) < 60:
# mutate some random candidate
candidate = random.choice(candidates)[0]
new_position = candidate.copy() # Copier la position pour la muter
l_idx = random.choice(randomness_l)
swapa = random.choice(lays[l_idx])
swapb = random.choice(lays[l_idx])
while swapa == swapb:
swapb = random.choice(lays[l_idx])
tmp = new_position[swapa]
new_position[swapa] = new_position[swapb]
new_position[swapb] = tmp
w = crossweight(new_position, node_layer, edges)
candidates.append([new_position, w])
while len(candidates) < 90:
# mutate some random candidate
candidate = random.choice(candidates)[0]
new_position = candidate.copy() # Copier la position pour la muter
l_idx = random.choice(randomness_l)
startidx = random.randrange(len(lays[l_idx]) - 1)
stopidx = random.randrange(startidx + 1, len(lays[l_idx]))
for n in lays[l_idx]:
if new_position[n] >= startidx and new_position[n] < stopidx:
new_position[n] += 1
elif new_position[n] == stopidx:
new_position[n] = startidx
w = crossweight(new_position, node_layer, edges)
candidates.append([new_position, w])
while len(candidates) < 100:
# mutate some random candidate
candidate = random.choice(candidates)[0]
candidate2 = random.choice(candidates)[0]
new_position = candidate.copy() # Copier la position pour la muter
l_idx = random.choice(randomness_l)
for n in lays[l_idx]:
new_position[n] = candidate2[n]
w = crossweight(new_position, node_layer, edges)
candidates.append([new_position, w])
candidates.sort(key=lambda x: x[1])
orders = []
best = candidates[0][0]
for i in range(5):
b = lays[i].copy()
b.sort(key=lambda x: best[x])
orders.append(b)
print(orders)
print(candidates[0][1])
return orders
def printsvg():
padding = 4
unit_ratio = 96 / 72
thickness = conf_value("thickness")
fontsize_name = conf_value("fontsize_name")
fontsize_count = conf_value("fontsize_count")
spacing = conf_value("spacing")
height = conf_value("height")
hmargin = conf_value("hmargin")
width = conf_value("width")
node_structure = {}
layers = [[], [], [], [], []]
edges = []
for layer, layernodes in enumerate(bags):
for startnode in layernodes:
if startnode[-1] == "*":
continue
for endnode in layernodes[startnode]:
if endnode[-1] == "*":
continue
weight = layernodes[startnode][endnode]
if endnode not in node_structure:
node_structure[endnode] = {
"prev": [[startnode, weight]],
"next": [],
"layer": layer + 1,
}
layers[layer + 1].append(endnode)
else:
node_structure[endnode]["prev"].append([startnode, weight])
if startnode not in node_structure:
node_structure[startnode] = {
"prev": [],
"next": [[endnode, weight]],
"layer": layer,
}
layers[layer].append(startnode)
else:
node_structure[startnode]["next"].append([endnode, weight])
edges.append([startnode, endnode, weight])
node_position = {}
node_layer = {}
layer_structure = [
{"olayer": []},
{"olayer": []},
{"olayer": []},
{"olayer": []},
{"olayer": []},
]
lastorders = read_conf("best-" + orderkey)
if lastorders != {}:
for i in range(5):
ls = layer_structure[i]
ord = lastorders[i]
for node in lastorders[i]:
if node in layers[i]:
ls["olayer"].append(node)
for node in layers[i]:
if node not in ls["olayer"]:
ls["olayer"].append(node)
for layer, layernodes in enumerate(layer_structure):
for j, n in enumerate(layernodes["olayer"]):
node_position[n] = j
node_layer[n] = layer
print(crossweight(node_position, node_layer, edges))
else:
for layer, layernodes in enumerate(layers):
for j, n in enumerate(layernodes):
node_position[n] = j
node_layer[n] = layer
orders = genetic_optimize(node_position, node_layer, edges)
write_conf("best-" + orderkey, orders)
layer_structure = [
{"olayer": []},
{"olayer": []},
{"olayer": []},
{"olayer": []},
{"olayer": []},
]
for i in range(5):
ls = layer_structure[i]
ord = orders[i]
for node in orders[i]:
if node in layers[i]:
ls["olayer"].append(node)
for node in layers[i]:
if node not in ls["olayer"]:
ls["olayer"].append(node)
for layer, layernodes in enumerate(layer_structure):
for j, n in enumerate(layernodes["olayer"]):
node_position[n] = j
node_layer[n] = layer
print(crossweight(node_position, node_layer, edges))
density = []
for i in range(5):
ls = layer_structure[i]
ls["num"] = len(ls["olayer"])
ls["inout"] = 0
for j in ls["olayer"]:
lhi = 0
lho = 0
k = node_structure[j]
for prev_node in k["prev"]:
lhi += prev_node[1]
for next_node in k["next"]:
lho += next_node[1]
k["size"] = max(lhi, lho)
k["in"] = lhi
k["out"] = lho
if lhi != lho and lhi * lho != 0:
print(f"BUG1: {j} {k} {lhi} {lho}")
ls["inout"] += k["size"]
ls["density"] = ls["inout"] / (
spacing + height - spacing * ls["num"] - 2 * hmargin
2024-10-14 01:17:25 +02:00
)
density.append(ls["density"])
realdensity = max(density)
columns = []
l = 0
for i in range(5):
l += width / 6
columns.append(l)
for i in range(5):
ls = layer_structure[i]
supp_spacing = (
spacing
+ height
- 2 * hmargin
- spacing * ls["num"]
- ls["inout"] / realdensity
) / (ls["num"] + 1)
cs = hmargin - spacing
for j in ls["olayer"]:
ns = node_structure[j]
ns["top"] = supp_spacing + spacing + cs
h = ns["size"] / realdensity
cs = ns["bottom"] = ns["top"] + h
d = drawsvg.Drawing(width, height, origin=(0, 0), id_prefix=orderkey)
g1 = drawsvg.Group()
g2 = drawsvg.Group()
g3 = drawsvg.Group()
g4 = drawsvg.Group()
font_offset = max(fontsize_count, fontsize_name)
for n in node_structure:
ns = node_structure[n]
col = node_color(n).split(" ")[0].split(".")[0]
ns["color"] = col
xpos = width / 6 * (ns["layer"] + 1)
r = drawsvg.Rectangle(
xpos - thickness,
ns["top"],
2 * thickness,
ns["bottom"] - ns["top"],
fill=col,
stroke_width=0.2,
stroke="black",
)
g1.append(r)
nw = textwidth(n, "Arial", fontsize_name) * unit_ratio
cw = textwidth(str(ns["size"]), "Arial", fontsize_count) * unit_ratio
gw = nw + cw + padding
ggw = gw + 2 * padding
nxpos = xpos - gw / 2 + cw + padding + nw / 2
ypos = (ns["top"] + ns["bottom"]) / 2
cxpos = cw / 2 - gw / 2 + xpos
rxpos = xpos
if ns["in"] == 0:
nxpos -= gw / 2 + padding + thickness
cxpos -= gw / 2 + padding + thickness
rxpos -= gw / 2 + padding + thickness
if ns["out"] == 0:
nxpos += gw / 2 + padding + thickness
cxpos += gw / 2 + padding + thickness
rxpos += gw / 2 + padding + thickness
t = drawsvg.Text(
n,
str(fontsize_name) + "pt",
nxpos,
ypos + fontsize_name / 2,
fill="black",
text_anchor="middle",
font_family="Arial",
)
tt = drawsvg.Text(
str(ns["size"]),
str(fontsize_count) + "pt",
cxpos,
ypos + fontsize_count / 2,
fill="black",
text_anchor="middle",
font_family="Arial",
)
g3.append(t)
g3.append(tt)
g4.append(
drawsvg.Rectangle(
rxpos - gw / 2 - padding,
ypos - font_offset / 2 - padding,
ggw,
font_offset + 2 * padding,
stroke="black",
stroke_width=0,
fill="white",
fill_opacity=".5",
rx=padding,
ry=padding,
)
)
for n in node_structure:
ns = node_structure[n]
ns["prev"].sort(key=lambda x: node_structure[x[0]]["top"])
ns["next"].sort(key=lambda x: node_structure[x[0]]["top"])
start = ns["top"]
for link in ns["prev"]:
ysize = link[-1]
link.append(start)
start += ysize / realdensity
link.append(start)
for n in node_structure:
ns = node_structure[n]
start = ns["top"]
for link in ns["next"]:
ysize = link[-1]
link.append(start)
start += ysize / realdensity
link.append(start)
targets = node_structure[link[0]]
target = None
for t in targets["prev"]:
if t[0] == n:
target = t
if target == None:
print(f"BUG: {n},{ns},{t}")
sys.exit(5)
posxa = columns[ns["layer"]] + thickness
posxb = columns[targets["layer"]] - thickness
posxc = (3 * posxa + posxb) / 4
posxd = (posxa + 3 * posxb) / 4
grad = drawsvg.LinearGradient(posxa, 0, posxb, 0)
grad.add_stop(0, ns["color"], opacity=0.5)
grad.add_stop(1, targets["color"], opacity=0.5)
p = drawsvg.Path(fill=grad, stroke_width=0)
p.M(posxa, link[-2])
p.C(posxc, link[-2], posxd, target[-2], posxb, target[-2])
p.L(posxb, target[-1])
p.C(posxd, target[-1], posxc, link[-1], posxa, link[-1])
p.Z()
g2.append(p)
d.append(g2)
d.append(g1)
d.append(g4)
d.append(g3)
d.save_svg(orderkey + ".svg")
printsvg()
for x in strangecases:
print(f"// {x}")