MonScoDocEssai/app/entreprises/app_relations_entreprises.py

549 lines
18 KiB
Python

# -*- mode: python -*-
# -*- coding: utf-8 -*-
##############################################################################
#
# Gestion scolarite IUT
#
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
##############################################################################
import os
from config import Config
import re
import requests
import glob
from flask import flash
from flask_login import current_user
from app.entreprises.models import (
Entreprise,
EntrepriseCorrespondant,
EntrepriseOffre,
EntrepriseOffreDepartement,
EntreprisePreferences,
EntrepriseSite,
EntrepriseLog,
)
from app import email, db
from app.scodoc import sco_preferences
from app.scodoc import sco_excel
from app.models import Departement
from app.scodoc.sco_permissions import Permission
ENTREPRISES_KEYS = [
"siret",
"nom_entreprise",
"adresse",
"ville",
"code_postal",
"pays",
]
SITES_KEYS = [
[
"siret_entreprise",
"id",
"nom_site",
"adresse",
"ville",
"code_postal",
"pays",
],
[
"civilite",
"nom",
"prenom",
"telephone",
"mail",
"poste",
"service",
"origine",
"notes",
],
]
CORRESPONDANTS_KEYS = [
"id",
"civilite",
"nom",
"prenom",
"telephone",
"mail",
"poste",
"service",
"origine",
"notes",
"nom_site",
]
def get_depts():
"""
Retourne une liste contenant les l'id des départements des roles de l'utilisateur courant
"""
depts = []
for role in current_user.user_roles:
dept_id = get_dept_id_by_acronym(role.dept)
if dept_id is not None:
depts.append(dept_id)
return depts
def get_dept_id_by_acronym(acronym: str):
"""
Retourne l'id d'un departement a l'aide de son acronym
"""
dept = Departement.query.filter_by(acronym=acronym).first()
if dept is not None:
return dept.id
return None
def get_dept_acronym_by_id(id: int):
dept = Departement.query.filter_by(id=id).first()
if dept is not None:
return dept.acronym
return None
def check_offre_depts(depts: list, offre_depts: list):
"""
Retourne vrai si l'utilisateur a le droit de visibilité sur l'offre
"""
if current_user.has_permission(Permission.RelationsEntreprisesChange, None):
return True
for offre_dept in offre_depts:
if offre_dept.dept_id in depts:
return True
return False
def get_offre_files_and_depts(offre: EntrepriseOffre, depts: list):
"""
Retourne l'offre, les fichiers attachés a l'offre et les département liés
"""
offre_depts = EntrepriseOffreDepartement.query.filter_by(offre_id=offre.id).all()
correspondant = EntrepriseCorrespondant.query.filter_by(
id=offre.correspondant_id
).first()
if not offre_depts or check_offre_depts(depts, offre_depts):
files = []
path = os.path.join(
Config.SCODOC_VAR_DIR,
"entreprises",
f"{offre.entreprise_id}",
f"{offre.id}",
)
if os.path.exists(path):
for dir in glob.glob(
f"{path}/[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
):
for _file in glob.glob(f"{dir}/*"):
file = [os.path.basename(dir), os.path.basename(_file)]
files.append(file)
return [offre, files, offre_depts, correspondant]
return None
def send_email_notifications_entreprise(subject: str, entreprise: Entreprise):
txt = [
"Une entreprise est en attente de validation",
"Entreprise:",
f"\tnom: {entreprise.nom}",
f"\tsiret: {entreprise.siret}",
f"\tadresse: {entreprise.adresse}",
f"\tcode postal: {entreprise.codepostal}",
f"\tville: {entreprise.ville}",
f"\tpays: {entreprise.pays}",
]
txt = "\n".join(txt)
email.send_email(
subject,
sco_preferences.get_preference("email_from_addr"),
[EntreprisePreferences.get_email_notifications],
txt,
)
return txt
def get_excel_book_are(export: bool = False):
"""
Retourne un Excel avec les 3 feuilles "Entreprises", "Sites" et "Correspondants"
"""
entreprises_titles = ENTREPRISES_KEYS[:]
sites_titles = SITES_KEYS[:]
correspondants_titles = CORRESPONDANTS_KEYS[:]
wb = sco_excel.ScoExcelBook()
ws1 = wb.create_sheet("Entreprises")
ws1.append_row(
[
ws1.make_cell(it, style)
for (it, style) in zip(
entreprises_titles,
[sco_excel.excel_make_style(bold=True)] * len(entreprises_titles),
)
]
)
ws2 = wb.create_sheet("Sites")
ws2.append_row(
[
ws2.make_cell(it, style)
for (it, style) in zip(
sites_titles[0],
[sco_excel.excel_make_style(bold=True)] * len(sites_titles[0]),
)
]
+ [
ws2.make_cell(it, style)
for (it, style) in zip(
sites_titles[1],
[sco_excel.excel_make_style(bold=True, color=sco_excel.COLORS.RED)]
* len(sites_titles[1]),
)
]
)
ws3 = wb.create_sheet("Correspondants")
ws3.append_row(
[
ws3.make_cell(it, style)
for (it, style) in zip(
correspondants_titles,
[sco_excel.excel_make_style(bold=True)] * len(correspondants_titles),
)
]
)
if export:
entreprises = Entreprise.query.filter_by(visible=True).all()
sites = (
db.session.query(EntrepriseSite)
.join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id)
.filter_by(visible=True)
.all()
)
correspondants = (
db.session.query(EntrepriseCorrespondant)
.join(Entreprise, EntrepriseCorrespondant.entreprise_id == Entreprise.id)
.filter_by(visible=True)
.all()
)
entreprises_lines = [
[entreprise.to_dict().get(k, "") for k in ENTREPRISES_KEYS]
for entreprise in entreprises
]
sites_lines = [
[site.to_dict().get(k, "") for k in SITES_KEYS[0]] for site in sites
]
correspondants_lines = [
[correspondant.to_dict().get(k, "") for k in CORRESPONDANTS_KEYS]
for correspondant in correspondants
]
for line in entreprises_lines:
cells = []
for it in line:
cells.append(ws1.make_cell(it))
ws1.append_row(cells)
for line in sites_lines:
cells = []
for it in line:
cells.append(ws2.make_cell(it))
ws2.append_row(cells)
for line in correspondants_lines:
cells = []
for it in line:
cells.append(ws3.make_cell(it))
ws3.append_row(cells)
return wb
def check_entreprises_import(m):
"""
Verifie la feuille Excel "Entreprises" de l'importation données
"""
entreprises_import = []
siret_list = []
ligne = 1
if m[0] != ENTREPRISES_KEYS:
flash(
f'Veuillez utilisez la feuille excel à remplir (Feuille "Entreprises", ligne {ligne})'
)
return False
for entreprise_data in m[1:]:
ligne += 1
entreprise_data[0] = entreprise_data[0].strip().replace(" ", "")
siret = entreprise_data[0]
if check_entreprise_import(entreprise_data) and siret not in siret_list:
siret_list.append(siret)
entreprise = Entreprise.query.filter_by(siret=siret, visible=True).first()
if entreprise is None:
entreprise_import = Entreprise(
siret=siret,
nom=entreprise_data[1].strip(),
adresse=entreprise_data[2].strip(),
ville=entreprise_data[3].strip(),
codepostal=entreprise_data[4].strip(),
pays=entreprise_data[5].strip()
if entreprise_data[5].strip()
else "FRANCE",
visible=True,
)
entreprises_import.append(entreprise_import)
else:
entreprise.nom = entreprise_data[1].strip()
entreprise.adresse = entreprise_data[2].strip()
entreprise.ville = entreprise_data[3].strip()
entreprise.codepostal = entreprise_data[4].strip()
entreprise.pays = (
entreprise_data[5].strip()
if entreprise_data[5].strip()
else "FRANCE"
)
else:
flash(
f'Erreur lors de l\'importation (Feuille "Entreprises", ligne {ligne})'
)
return False
if len(entreprises_import) > 0:
log = EntrepriseLog(
authenticated_user=current_user.user_name,
text=f"Importation de {len(entreprises_import)} entreprise(s)",
)
db.session.add(log)
return entreprises_import
def check_entreprise_import(entreprise_data: list):
"""
Verifie les données d'une ligne Excel (entreprise)
"""
for data in entreprise_data[:-1]: # champs obligatoires
if data.strip() == "":
return False
siret = entreprise_data[0]
if EntreprisePreferences.get_check_siret():
if re.match("^\d{14}$", siret) is None:
return False
try:
req = requests.get(
f"https://entreprise.data.gouv.fr/api/sirene/v1/siret/{siret}"
)
if req.status_code != 200:
return False
except requests.ConnectionError:
return False
return True
def check_sites_import(m):
"""
Verifie la feuille Excel "Sites" de l'importation données
"""
sites_import = []
correspondants_import = []
ligne = 1
if m[0] != sum(SITES_KEYS, []):
flash(
f'Veuillez utilisez la feuille excel à remplir (Feuille "Sites", ligne {ligne})'
)
return False, False
for site_data in m[1:]:
ligne += 1
site_data[0] = site_data[0].strip().replace(" ", "")
if check_site_import(site_data):
correspondant_data = site_data[len(SITES_KEYS[0]) :]
entreprise = Entreprise.query.filter_by(
siret=site_data[0], visible=True
).first()
if site_data[1].strip() == "":
site_import = EntrepriseSite(
entreprise_id=entreprise.id,
nom=site_data[2].strip(),
adresse=site_data[3].strip(),
codepostal=site_data[4].strip(),
ville=site_data[5].strip(),
pays=site_data[6].strip(),
)
if correspondant_data == [""] * len(SITES_KEYS[1]):
sites_import.append(site_import)
else:
correspondant_import = EntrepriseCorrespondant(
entreprise_id=entreprise.id,
civilite=correspondant_data[0],
nom=correspondant_data[1],
prenom=correspondant_data[2],
telephone=correspondant_data[3],
mail=correspondant_data[4],
poste=correspondant_data[5],
service=correspondant_data[6],
origine=correspondant_data[7],
notes=correspondant_data[8],
)
sites_import.append(site_import)
correspondants_import.append([site_import, correspondant_import])
else:
site_id = site_data[1].strip()
site = EntrepriseSite.query.filter_by(id=site_id).first()
site.nom = site_data[2].strip()
site.adresse = site_data[3].strip()
site.codepostal = site_data[4].strip()
site.ville = site_data[5].strip()
site.pays = site_data[6].strip()
if correspondant_data != [""] * len(SITES_KEYS[1]):
correspondant_import = EntrepriseCorrespondant(
entreprise_id=entreprise.id,
site_id=site.id,
civilite=correspondant_data[0],
nom=correspondant_data[1],
prenom=correspondant_data[2],
telephone=correspondant_data[3],
mail=correspondant_data[4],
poste=correspondant_data[5],
service=correspondant_data[6],
origine=correspondant_data[7],
notes=correspondant_data[8],
)
correspondants_import.append([None, correspondant_import])
else:
flash(f'Erreur lors de l\'importation (Feuille "Sites", ligne {ligne})')
return False, False
if len(sites_import) > 0:
log = EntrepriseLog(
authenticated_user=current_user.user_name,
text=f"Importation de {len(sites_import)} site(s)",
)
db.session.add(log)
if len(correspondants_import) > 0:
log = EntrepriseLog(
authenticated_user=current_user.user_name,
text=f"Importation de {len(correspondants_import)} correspondant(s)",
)
db.session.add(log)
return sites_import, correspondants_import
def check_site_import(row_data: list):
"""
Verifie les données d'une ligne Excel (sites)
"""
site_data = row_data[: -len(SITES_KEYS[1])]
correspondant_data = row_data[len(SITES_KEYS[0]) :]
for data in [site_data[0]] + site_data[2:]: # champs obligatoires
if data.strip() == "":
return False
if correspondant_data != [""] * len(SITES_KEYS[1]):
if check_correspondant_import(correspondant_data, site_data) is False:
return False
entreprise = Entreprise.query.filter_by(siret=site_data[0], visible=True).first()
if entreprise is None:
return False
site = EntrepriseSite.query.filter_by(nom=site_data[2]).first()
if site_data[1] == "" and site is not None:
return False
return True
def check_correspondants_import(m):
ligne = 1
if m[0] != CORRESPONDANTS_KEYS:
flash(
f'Veuillez utilisez la feuille excel à remplir (Feuille "Correspondants", ligne {ligne})'
)
return False
for correspondant_data in m[1:]:
ligne += 1
if correspondant_data[0] == "":
flash(
f'Erreur lors de l\'importation (Feuille "Correspondants", ligne {ligne})'
)
return False
correspondant = EntrepriseCorrespondant.query.filter_by(
id=correspondant_data[0]
).first()
if correspondant is not None and check_correspondant_import(
correspondant_data[1:-1]
):
correspondant.civilite = correspondant_data[1].strip()
correspondant.nom = correspondant_data[2].strip()
correspondant.prenom = correspondant_data[3].strip()
correspondant.telephone = correspondant_data[4].strip()
correspondant.mail = correspondant_data[5].strip()
correspondant.poste = correspondant_data[6].strip()
correspondant.service = correspondant_data[7].strip()
correspondant.origine = correspondant_data[8].strip()
correspondant.notes = correspondant_data[9].strip()
else:
flash(
f'Erreur lors de l\'importation (Feuille "Correspondants", ligne {ligne})'
)
return False
return True
def check_correspondant_import(correspondant_data: list, site_data: list = None):
"""
Verifie les données d'une ligne Excel (correspondant)
"""
# champs obligatoires
if (
correspondant_data[0].strip() == ""
or correspondant_data[1].strip() == ""
or correspondant_data[2].strip() == ""
):
return False
# civilite entre H ou F
if correspondant_data[0].strip() not in ["H", "F"]:
return False
if (
correspondant_data[3].strip() == "" and correspondant_data[4].strip() == ""
): # 1 moyen de contact
return False
if site_data is not None:
# entreprise_id existant
entreprise = Entreprise.query.filter_by(
siret=site_data[0], visible=True
).first()
if entreprise is None:
return False
# correspondant possède le meme nom et prénom dans la meme entreprise
correspondant = EntrepriseCorrespondant.query.filter_by(
nom=correspondant_data[1].strip(),
prenom=correspondant_data[2].strip(),
entreprise_id=entreprise.id,
).first()
if correspondant is not None:
return False
return True