forked from ScoDoc/ScoDoc
Emmanuel Viennet
7fc3108886
- réglage de l'adresse origine From au niveau global et systémtisation de son utilisation. - ajout de logs, réglage du log par défaut. - modernisation de code.
613 lines
20 KiB
Python
613 lines
20 KiB
Python
# -*- mode: python -*-
|
|
# -*- coding: utf-8 -*-
|
|
|
|
##############################################################################
|
|
#
|
|
# Gestion scolarite IUT
|
|
#
|
|
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
#
|
|
##############################################################################
|
|
|
|
import os
|
|
from config import Config
|
|
import re
|
|
import requests
|
|
import glob
|
|
from datetime import date
|
|
|
|
from flask import flash
|
|
from flask_login import current_user
|
|
|
|
from app.entreprises.models import (
|
|
Entreprise,
|
|
EntrepriseCorrespondant,
|
|
EntrepriseOffre,
|
|
EntrepriseOffreDepartement,
|
|
EntreprisePreferences,
|
|
EntrepriseSite,
|
|
EntrepriseHistorique,
|
|
)
|
|
from app import email, db
|
|
from app.scodoc import sco_preferences
|
|
from app.scodoc import sco_excel
|
|
from app.models import Departement
|
|
from app.scodoc.sco_permissions import Permission
|
|
|
|
|
|
ENTREPRISES_KEYS = [
|
|
"siret",
|
|
"nom_entreprise",
|
|
"adresse",
|
|
"ville",
|
|
"code_postal",
|
|
"pays",
|
|
]
|
|
SITES_KEYS = [
|
|
[
|
|
"siret_entreprise",
|
|
"id_site",
|
|
"nom_site",
|
|
"adresse",
|
|
"ville",
|
|
"code_postal",
|
|
"pays",
|
|
],
|
|
[
|
|
"civilite",
|
|
"nom",
|
|
"prenom",
|
|
"telephone",
|
|
"mail",
|
|
"poste",
|
|
"service",
|
|
"origine",
|
|
"notes",
|
|
],
|
|
]
|
|
CORRESPONDANTS_KEYS = [
|
|
"id",
|
|
"civilite",
|
|
"nom",
|
|
"prenom",
|
|
"telephone",
|
|
"mail",
|
|
"poste",
|
|
"service",
|
|
"origine",
|
|
"notes",
|
|
"nom_site",
|
|
]
|
|
|
|
|
|
def get_depts():
|
|
"""
|
|
Retourne une liste contenant les l'id des départements des roles de l'utilisateur courant
|
|
"""
|
|
depts = []
|
|
for role in current_user.user_roles:
|
|
dept_id = get_dept_id_by_acronym(role.dept)
|
|
if dept_id is not None:
|
|
depts.append(dept_id)
|
|
return depts
|
|
|
|
|
|
def get_dept_id_by_acronym(acronym: str):
|
|
"""
|
|
Retourne l'id d'un departement a l'aide de son acronym
|
|
"""
|
|
dept = Departement.query.filter_by(acronym=acronym).first()
|
|
if dept is not None:
|
|
return dept.id
|
|
return None
|
|
|
|
|
|
def get_dept_acronym_by_id(id: int):
|
|
"""
|
|
Retourne l'acronym d'un departement a l'aide de son id
|
|
"""
|
|
dept = Departement.query.filter_by(id=id).first()
|
|
if dept is not None:
|
|
return dept.acronym
|
|
return None
|
|
|
|
|
|
def check_offre_depts(depts: list, offre_depts: list):
|
|
"""
|
|
Retourne vrai si l'utilisateur a le droit de visibilité sur l'offre
|
|
"""
|
|
if current_user.has_permission(Permission.RelationsEntreprisesChange, None):
|
|
return True
|
|
for offre_dept in offre_depts:
|
|
if offre_dept.dept_id in depts:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_offre_files_and_depts(offre: EntrepriseOffre, depts: list):
|
|
"""
|
|
Retourne l'offre, les fichiers attachés a l'offre, les département liés a l'offre et le correspondant
|
|
"""
|
|
offre_depts = EntrepriseOffreDepartement.query.filter_by(offre_id=offre.id).all()
|
|
correspondant = EntrepriseCorrespondant.query.filter_by(
|
|
id=offre.correspondant_id
|
|
).first()
|
|
if not offre_depts or check_offre_depts(depts, offre_depts):
|
|
files = []
|
|
path = os.path.join(
|
|
Config.SCODOC_VAR_DIR,
|
|
"entreprises",
|
|
f"{offre.entreprise_id}",
|
|
f"{offre.id}",
|
|
)
|
|
if os.path.exists(path):
|
|
for dir in glob.glob(
|
|
f"{path}/[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
|
|
):
|
|
for _file in glob.glob(f"{dir}/*"):
|
|
file = [os.path.basename(dir), os.path.basename(_file)]
|
|
files.append(file)
|
|
return [offre, files, offre_depts, correspondant]
|
|
return None
|
|
|
|
|
|
def get_offres_non_expirees_with_files(offres):
|
|
"""
|
|
Retourne une liste avec toutes les offres non expirées (offre, files, offre_depts, correspondant)
|
|
"""
|
|
depts = get_depts()
|
|
offres_with_files = []
|
|
for offre in offres:
|
|
if not offre.expired and (
|
|
offre.expiration_date is None
|
|
or (
|
|
offre.expiration_date is not None
|
|
and date.today() < offre.expiration_date
|
|
)
|
|
):
|
|
offre_with_files = get_offre_files_and_depts(offre, depts)
|
|
if offre_with_files is not None:
|
|
offres_with_files.append(offre_with_files)
|
|
return offres_with_files
|
|
|
|
|
|
def get_offres_expirees_with_files(offres):
|
|
"""
|
|
Retourne une liste avec toutes les offres expirées (offre, files, offre_depts, correspondant)
|
|
"""
|
|
depts = get_depts()
|
|
offres_with_files = []
|
|
for offre in offres:
|
|
if offre.expired or (
|
|
offre.expiration_date is not None and date.today() > offre.expiration_date
|
|
):
|
|
offre_with_files = get_offre_files_and_depts(offre, depts)
|
|
if offre_with_files is not None:
|
|
offres_with_files.append(offre_with_files)
|
|
return offres_with_files
|
|
|
|
|
|
def send_email_notifications_entreprise(subject: str, entreprise: Entreprise):
|
|
txt = [
|
|
"Une entreprise est en attente de validation",
|
|
"Entreprise:",
|
|
f"\tnom: {entreprise.nom}",
|
|
f"\tsiret: {entreprise.siret}",
|
|
f"\tadresse: {entreprise.adresse}",
|
|
f"\tcode postal: {entreprise.codepostal}",
|
|
f"\tville: {entreprise.ville}",
|
|
f"\tpays: {entreprise.pays}",
|
|
]
|
|
txt = "\n".join(txt)
|
|
email.send_email(
|
|
subject,
|
|
email.get_from_addr(),
|
|
[EntreprisePreferences.get_email_notifications],
|
|
txt,
|
|
)
|
|
return txt
|
|
|
|
|
|
def get_excel_book_are(export: bool = False):
|
|
"""
|
|
Retourne un Excel avec les 3 feuilles "Entreprises", "Sites" et "Correspondants"
|
|
si export est True, remplit les feuilles avec les données a exporter
|
|
"""
|
|
entreprises_titles = ENTREPRISES_KEYS[:]
|
|
sites_titles = SITES_KEYS[:]
|
|
correspondants_titles = CORRESPONDANTS_KEYS[:]
|
|
wb = sco_excel.ScoExcelBook()
|
|
ws1 = wb.create_sheet("Entreprises")
|
|
ws1.append_row(
|
|
[
|
|
ws1.make_cell(it, style)
|
|
for (it, style) in zip(
|
|
entreprises_titles,
|
|
[sco_excel.excel_make_style(bold=True)] * len(entreprises_titles),
|
|
)
|
|
]
|
|
)
|
|
ws2 = wb.create_sheet("Sites")
|
|
ws2.append_row(
|
|
[
|
|
ws2.make_cell(it, style)
|
|
for (it, style) in zip(
|
|
sites_titles[0],
|
|
[sco_excel.excel_make_style(bold=True)] * len(sites_titles[0]),
|
|
)
|
|
]
|
|
+ [
|
|
ws2.make_cell(it, style)
|
|
for (it, style) in zip(
|
|
sites_titles[1],
|
|
[sco_excel.excel_make_style(bold=True, color=sco_excel.COLORS.RED)]
|
|
* len(sites_titles[1]),
|
|
)
|
|
]
|
|
)
|
|
ws3 = wb.create_sheet("Correspondants")
|
|
ws3.append_row(
|
|
[
|
|
ws3.make_cell(it, style)
|
|
for (it, style) in zip(
|
|
correspondants_titles,
|
|
[sco_excel.excel_make_style(bold=True)] * len(correspondants_titles),
|
|
)
|
|
]
|
|
)
|
|
if export:
|
|
entreprises = Entreprise.query.filter_by(visible=True).all()
|
|
sites = (
|
|
db.session.query(EntrepriseSite)
|
|
.join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id)
|
|
.filter_by(visible=True)
|
|
.all()
|
|
)
|
|
correspondants = (
|
|
db.session.query(EntrepriseCorrespondant)
|
|
.join(EntrepriseSite, EntrepriseCorrespondant.site_id == EntrepriseSite.id)
|
|
.join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id)
|
|
.filter_by(visible=True)
|
|
.all()
|
|
)
|
|
entreprises_lines = [
|
|
[entreprise.to_dict().get(k, "") for k in ENTREPRISES_KEYS]
|
|
for entreprise in entreprises
|
|
]
|
|
sites_lines = [
|
|
[site.to_dict().get(k, "") for k in SITES_KEYS[0]] for site in sites
|
|
]
|
|
correspondants_lines = [
|
|
[correspondant.to_dict().get(k, "") for k in CORRESPONDANTS_KEYS]
|
|
for correspondant in correspondants
|
|
]
|
|
for line in entreprises_lines:
|
|
cells = []
|
|
for it in line:
|
|
cells.append(ws1.make_cell(it))
|
|
ws1.append_row(cells)
|
|
for line in sites_lines:
|
|
cells = []
|
|
for it in line:
|
|
cells.append(ws2.make_cell(it))
|
|
ws2.append_row(cells)
|
|
for line in correspondants_lines:
|
|
cells = []
|
|
for it in line:
|
|
cells.append(ws3.make_cell(it))
|
|
ws3.append_row(cells)
|
|
return wb
|
|
|
|
|
|
def check_entreprises_import(m):
|
|
"""
|
|
Verifie la feuille Excel "Entreprises" de l'importation données
|
|
"""
|
|
entreprises_import = []
|
|
siret_list = []
|
|
ligne = 1
|
|
if m[0] != ENTREPRISES_KEYS:
|
|
flash(
|
|
f'Veuillez utilisez la feuille excel à remplir (Feuille "Entreprises", ligne {ligne})'
|
|
)
|
|
return False
|
|
l = list_to_dict(m)
|
|
for entreprise_data in l:
|
|
ligne += 1
|
|
entreprise_data["siret"] = entreprise_data["siret"].replace(" ", "")
|
|
if (
|
|
check_entreprise_import(entreprise_data)
|
|
and entreprise_data["siret"] not in siret_list
|
|
):
|
|
siret_list.append(entreprise_data["siret"])
|
|
entreprise = Entreprise.query.filter_by(
|
|
siret=entreprise_data["siret"], visible=True
|
|
).first()
|
|
if entreprise is None:
|
|
entreprise_import = Entreprise(
|
|
siret=entreprise_data["siret"],
|
|
nom=entreprise_data["nom_entreprise"],
|
|
adresse=entreprise_data["adresse"],
|
|
ville=entreprise_data["ville"],
|
|
codepostal=entreprise_data["code_postal"],
|
|
pays=entreprise_data["pays"]
|
|
if entreprise_data["pays"]
|
|
else "FRANCE",
|
|
visible=True,
|
|
)
|
|
entreprises_import.append(entreprise_import)
|
|
else:
|
|
entreprise.nom = entreprise_data["nom_entreprise"]
|
|
entreprise.adresse = entreprise_data["adresse"]
|
|
entreprise.ville = entreprise_data["ville"]
|
|
entreprise.codepostal = entreprise_data["code_postal"]
|
|
entreprise.pays = (
|
|
entreprise_data["pays"] if entreprise_data["pays"] else "FRANCE"
|
|
)
|
|
else:
|
|
flash(
|
|
f'Erreur lors de l\'importation (Feuille "Entreprises", ligne {ligne})'
|
|
)
|
|
return False
|
|
|
|
if len(entreprises_import) > 0:
|
|
log = EntrepriseHistorique(
|
|
authenticated_user=current_user.user_name,
|
|
text=f"Importation de {len(entreprises_import)} entreprise(s)",
|
|
)
|
|
db.session.add(log)
|
|
|
|
return entreprises_import
|
|
|
|
|
|
def check_entreprise_import(entreprise_data):
|
|
"""
|
|
Verifie les données d'une ligne Excel (entreprise)
|
|
"""
|
|
champs_obligatoires = ["siret", "nom_entreprise", "adresse", "ville", "code_postal"]
|
|
for key, value in entreprise_data.items(): # champs obligatoires
|
|
if key in champs_obligatoires and value == "":
|
|
return False
|
|
|
|
siret = entreprise_data["siret"]
|
|
|
|
if EntreprisePreferences.get_check_siret():
|
|
if re.match("^\d{14}$", siret) is None:
|
|
return False
|
|
else:
|
|
try:
|
|
req = requests.get(
|
|
f"https://entreprise.data.gouv.fr/api/sirene/v1/siret/{siret}"
|
|
)
|
|
if req.status_code != 200:
|
|
return False
|
|
except requests.ConnectionError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def check_sites_import(m):
|
|
"""
|
|
Verifie la feuille Excel "Sites" de l'importation données
|
|
"""
|
|
sites_import = []
|
|
correspondants_import = []
|
|
ligne = 1
|
|
if m[0] != sum(SITES_KEYS, []):
|
|
flash(
|
|
f'Veuillez utilisez la feuille excel à remplir (Feuille "Sites", ligne {ligne})'
|
|
)
|
|
return False, False
|
|
l = list_to_dict(m)
|
|
for site_data in l:
|
|
ligne += 1
|
|
site_data["siret_entreprise"] = site_data["siret_entreprise"].replace(" ", "")
|
|
if check_site_import(site_data):
|
|
entreprise = Entreprise.query.filter_by(
|
|
siret=site_data["siret_entreprise"], visible=True
|
|
).first()
|
|
if site_data["id_site"] == "":
|
|
site_import = EntrepriseSite(
|
|
entreprise_id=entreprise.id,
|
|
nom=site_data["nom_site"],
|
|
adresse=site_data["adresse"],
|
|
codepostal=site_data["code_postal"],
|
|
ville=site_data["ville"],
|
|
pays=site_data["pays"],
|
|
)
|
|
if site_data["civilite"] == "":
|
|
sites_import.append(site_import)
|
|
else:
|
|
correspondant_import = EntrepriseCorrespondant(
|
|
civilite=site_data["civilite"],
|
|
nom=site_data["nom"],
|
|
prenom=site_data["prenom"],
|
|
telephone=site_data["telephone"],
|
|
mail=site_data["mail"],
|
|
poste=site_data["poste"],
|
|
service=site_data["service"],
|
|
origine=site_data["origine"],
|
|
notes=site_data["notes"],
|
|
)
|
|
sites_import.append(site_import)
|
|
correspondants_import.append([site_import, correspondant_import])
|
|
else:
|
|
site = EntrepriseSite.query.filter_by(id=site_data["id_site"]).first()
|
|
site.nom = site_data["nom_site"]
|
|
site.adresse = site_data["adresse"]
|
|
site.codepostal = site_data["code_postal"]
|
|
site.ville = site_data["ville"]
|
|
site.pays = site_data["pays"]
|
|
|
|
if site_data["civilite"] != "":
|
|
correspondant_import = EntrepriseCorrespondant(
|
|
site_id=site.id,
|
|
civilite=site_data["civilite"],
|
|
nom=site_data["nom"],
|
|
prenom=site_data["prenom"],
|
|
telephone=site_data["telephone"],
|
|
mail=site_data["mail"],
|
|
poste=site_data["poste"],
|
|
service=site_data["service"],
|
|
origine=site_data["origine"],
|
|
notes=site_data["notes"],
|
|
)
|
|
correspondants_import.append([None, correspondant_import])
|
|
else:
|
|
flash(f'Erreur lors de l\'importation (Feuille "Sites", ligne {ligne})')
|
|
return False, False
|
|
|
|
if len(sites_import) > 0:
|
|
log = EntrepriseHistorique(
|
|
authenticated_user=current_user.user_name,
|
|
text=f"Importation de {len(sites_import)} site(s)",
|
|
)
|
|
db.session.add(log)
|
|
|
|
if len(correspondants_import) > 0:
|
|
log = EntrepriseHistorique(
|
|
authenticated_user=current_user.user_name,
|
|
text=f"Importation de {len(correspondants_import)} correspondant(s)",
|
|
)
|
|
db.session.add(log)
|
|
|
|
return sites_import, correspondants_import
|
|
|
|
|
|
def check_site_import(site_data):
|
|
"""
|
|
Verifie les données d'une ligne Excel (site)
|
|
"""
|
|
champs_obligatoires = [
|
|
"siret_entreprise",
|
|
"nom_site",
|
|
"adresse",
|
|
"ville",
|
|
"code_postal",
|
|
"pays",
|
|
]
|
|
for key, value in site_data.items(): # champs obligatoires
|
|
if key in champs_obligatoires and value == "":
|
|
return False
|
|
|
|
if site_data["civilite"] != "":
|
|
if check_correspondant_import(site_data) is False:
|
|
return False
|
|
|
|
entreprise = Entreprise.query.filter_by(
|
|
siret=site_data["siret_entreprise"], visible=True
|
|
).first()
|
|
if entreprise is None:
|
|
return False
|
|
|
|
site = EntrepriseSite.query.filter_by(nom=site_data["nom_site"]).first()
|
|
if site_data["id_site"] == "" and site is not None:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def check_correspondants_import(m):
|
|
"""
|
|
Verifie la feuille Excel "Correspondants" de l'importation données
|
|
"""
|
|
ligne = 1
|
|
if m[0] != CORRESPONDANTS_KEYS:
|
|
flash(
|
|
f'Veuillez utilisez la feuille excel à remplir (Feuille "Correspondants", ligne {ligne})'
|
|
)
|
|
return False
|
|
l = list_to_dict(m)
|
|
for correspondant_data in l:
|
|
ligne += 1
|
|
if correspondant_data["id"] == "":
|
|
flash(
|
|
f'Erreur lors de l\'importation (Feuille "Correspondants", ligne {ligne})'
|
|
)
|
|
return False
|
|
correspondant = EntrepriseCorrespondant.query.filter_by(
|
|
id=correspondant_data["id"]
|
|
).first()
|
|
if correspondant is not None and check_correspondant_import(correspondant_data):
|
|
correspondant.civilite = correspondant_data["civilite"]
|
|
correspondant.nom = correspondant_data["nom"]
|
|
correspondant.prenom = correspondant_data["prenom"]
|
|
correspondant.telephone = correspondant_data["telephone"]
|
|
correspondant.mail = correspondant_data["mail"]
|
|
correspondant.poste = correspondant_data["poste"]
|
|
correspondant.service = correspondant_data["service"]
|
|
correspondant.origine = correspondant_data["origine"]
|
|
correspondant.notes = correspondant_data["notes"]
|
|
else:
|
|
flash(
|
|
f'Erreur lors de l\'importation (Feuille "Correspondants", ligne {ligne})'
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
def check_correspondant_import(correspondant_data):
|
|
"""
|
|
Verifie les données d'une ligne Excel (correspondant)
|
|
"""
|
|
champs_obligatoires = ["civilite", "nom", "prenom"]
|
|
for key, value in correspondant_data.items(): # champs obligatoires
|
|
if key in champs_obligatoires and value == "":
|
|
return False
|
|
|
|
# civilite entre H ou F
|
|
if correspondant_data["civilite"].upper() not in ["H", "F"]:
|
|
return False
|
|
|
|
if (
|
|
correspondant_data["telephone"] == "" and correspondant_data["mail"] == ""
|
|
): # 1 moyen de contact
|
|
return False
|
|
|
|
if "siret_entreprise" in correspondant_data:
|
|
# entreprise_id existant
|
|
entreprise = Entreprise.query.filter_by(
|
|
siret=correspondant_data["siret_entreprise"], visible=True
|
|
).first()
|
|
if entreprise is None:
|
|
return False
|
|
|
|
# correspondant possède le meme nom et prénom dans la meme entreprise
|
|
if correspondant_data["id_site"] != "":
|
|
correspondant = EntrepriseCorrespondant.query.filter_by(
|
|
nom=correspondant_data["nom"],
|
|
prenom=correspondant_data["prenom"],
|
|
site_id=correspondant_data["id_site"],
|
|
).first()
|
|
if correspondant is not None:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def list_to_dict(m):
|
|
"""
|
|
Transforme une liste de liste (matrice) en liste de dictionnaire (key = premiere liste de la matrice)
|
|
"""
|
|
l = []
|
|
for data in m[1:]:
|
|
new_dict = {title: value.strip() for title, value in zip(m[0], data)}
|
|
l.append(new_dict)
|
|
|
|
return l
|