suite import de données

This commit is contained in:
Arthur ZHU 2022-06-02 21:49:37 +02:00
parent 9e751722e6
commit 74cd1400d8
5 changed files with 246 additions and 143 deletions

View File

@ -47,6 +47,51 @@ from app.models import Departement
from app.scodoc.sco_permissions import Permission from app.scodoc.sco_permissions import Permission
ENTREPRISES_KEYS = [
"siret",
"nom_entreprise",
"adresse",
"ville",
"code_postal",
"pays",
]
SITES_KEYS = [
[
"siret_entreprise",
"id",
"nom_site",
"adresse",
"ville",
"code_postal",
"pays",
],
[
"civilite",
"nom",
"prenom",
"telephone",
"mail",
"poste",
"service",
"origine",
"notes",
],
]
CORRESPONDANTS_KEYS = [
"id",
"civilite",
"nom",
"prenom",
"telephone",
"mail",
"poste",
"service",
"origine",
"notes",
"nom_site",
]
def get_depts(): def get_depts():
""" """
Retourne une liste contenant les l'id des départements des roles de l'utilisateur courant Retourne une liste contenant les l'id des départements des roles de l'utilisateur courant
@ -59,7 +104,7 @@ def get_depts():
return depts return depts
def get_dept_id_by_acronym(acronym): def get_dept_id_by_acronym(acronym: str):
""" """
Retourne l'id d'un departement a l'aide de son acronym Retourne l'id d'un departement a l'aide de son acronym
""" """
@ -69,14 +114,14 @@ def get_dept_id_by_acronym(acronym):
return None return None
def get_dept_acronym_by_id(id): def get_dept_acronym_by_id(id: int):
dept = Departement.query.filter_by(id=id).first() dept = Departement.query.filter_by(id=id).first()
if dept is not None: if dept is not None:
return dept.acronym return dept.acronym
return None return None
def check_offre_depts(depts, offre_depts): def check_offre_depts(depts: list, offre_depts: list):
""" """
Retourne vrai si l'utilisateur a le droit de visibilité sur l'offre Retourne vrai si l'utilisateur a le droit de visibilité sur l'offre
""" """
@ -115,7 +160,7 @@ def get_offre_files_and_depts(offre: EntrepriseOffre, depts: list):
return None return None
def send_email_notifications_entreprise(subject, entreprise: Entreprise): def send_email_notifications_entreprise(subject: str, entreprise: Entreprise):
txt = [ txt = [
"Une entreprise est en attente de validation", "Une entreprise est en attente de validation",
"Entreprise:", "Entreprise:",
@ -136,6 +181,92 @@ def send_email_notifications_entreprise(subject, entreprise: Entreprise):
return txt return txt
def get_excel_book_are(export: bool = False):
entreprises_titles = ENTREPRISES_KEYS[:]
sites_titles = SITES_KEYS[:]
correspondants_titles = CORRESPONDANTS_KEYS[:]
wb = sco_excel.ScoExcelBook()
ws1 = wb.create_sheet("Entreprises")
ws1.append_row(
[
ws1.make_cell(it, style)
for (it, style) in zip(
entreprises_titles,
[sco_excel.excel_make_style(bold=True)] * len(entreprises_titles),
)
]
)
ws2 = wb.create_sheet("Sites")
ws2.append_row(
[
ws2.make_cell(it, style)
for (it, style) in zip(
sites_titles[0],
[sco_excel.excel_make_style(bold=True)] * len(sites_titles[0]),
)
]
+ [
ws2.make_cell(it, style)
for (it, style) in zip(
sites_titles[1],
[sco_excel.excel_make_style(bold=True, color=sco_excel.COLORS.RED)]
* len(sites_titles[1]),
)
]
)
ws3 = wb.create_sheet("Correspondants")
ws3.append_row(
[
ws3.make_cell(it, style)
for (it, style) in zip(
correspondants_titles,
[sco_excel.excel_make_style(bold=True)] * len(correspondants_titles),
)
]
)
if export:
entreprises = Entreprise.query.filter_by(visible=True).all()
sites = (
db.session.query(EntrepriseSite)
.join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id)
.filter_by(visible=True)
.all()
)
correspondants = (
db.session.query(EntrepriseCorrespondant)
.join(Entreprise, EntrepriseCorrespondant.entreprise_id == Entreprise.id)
.filter_by(visible=True)
.all()
)
entreprises_lines = [
[entreprise.to_dict().get(k, "") for k in ENTREPRISES_KEYS]
for entreprise in entreprises
]
sites_lines = [
[site.to_dict().get(k, "") for k in SITES_KEYS[0]] for site in sites
]
correspondants_lines = [
[correspondant.to_dict().get(k, "") for k in CORRESPONDANTS_KEYS]
for correspondant in correspondants
]
for line in entreprises_lines:
cells = []
for it in line:
cells.append(ws1.make_cell(it))
ws1.append_row(cells)
for line in sites_lines:
cells = []
for it in line:
cells.append(ws2.make_cell(it))
ws2.append_row(cells)
for line in correspondants_lines:
cells = []
for it in line:
cells.append(ws3.make_cell(it))
ws3.append_row(cells)
return wb
def verif_correspondant_data(correspondant_data): def verif_correspondant_data(correspondant_data):
""" """
Verifie les données d'une ligne Excel (correspondant) Verifie les données d'une ligne Excel (correspondant)
@ -213,132 +344,3 @@ def verif_entreprise_data(entreprise_data):
if entreprise is not None: if entreprise is not None:
return False return False
return True return True
def get_excel_book_are(export=False):
entreprises_keys = [
"siret",
"nom_entreprise",
"adresse",
"ville",
"code_postal",
"pays",
]
sites_keys = [
[
"siret_entreprise",
"id",
"nom_site",
"adresse",
"ville",
"code_postal",
"pays",
],
[
"civilite",
"nom",
"prenom",
"telephone",
"mail",
"poste",
"service",
"origine",
"notes",
],
]
correspondants_keys = [
"id",
"civilite",
"nom",
"prenom",
"telephone",
"mail",
"poste",
"service",
"origine",
"notes",
"nom_site",
]
entreprises_titles = entreprises_keys[:]
sites_titles = sites_keys[:]
correspondants_titles = correspondants_keys[:]
wb = sco_excel.ScoExcelBook()
ws1 = wb.create_sheet("Entreprises")
ws1.append_row(
[
ws1.make_cell(it, style)
for (it, style) in zip(
entreprises_titles,
[sco_excel.excel_make_style(bold=True)] * len(entreprises_titles),
)
]
)
ws2 = wb.create_sheet("Sites")
ws2.append_row(
[
ws2.make_cell(it, style)
for (it, style) in zip(
sites_titles[0],
[sco_excel.excel_make_style(bold=True)] * len(sites_titles[0]),
)
]
+ [
ws2.make_cell(it, style)
for (it, style) in zip(
sites_titles[1],
[sco_excel.excel_make_style(bold=True, color=sco_excel.COLORS.RED)]
* len(sites_titles[1]),
)
]
)
ws3 = wb.create_sheet("Correspondants")
ws3.append_row(
[
ws3.make_cell(it, style)
for (it, style) in zip(
correspondants_titles,
[sco_excel.excel_make_style(bold=True)] * len(correspondants_titles),
)
]
)
if export:
entreprises = Entreprise.query.filter_by(visible=True).all()
sites = (
db.session.query(EntrepriseSite)
.join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id)
.filter_by(visible=True)
.all()
)
correspondants = (
db.session.query(EntrepriseCorrespondant)
.join(Entreprise, EntrepriseCorrespondant.entreprise_id == Entreprise.id)
.filter_by(visible=True)
.all()
)
entreprises_lines = [
[entreprise.to_dict().get(k, "") for k in entreprises_keys]
for entreprise in entreprises
]
sites_lines = [
[site.to_dict().get(k, "") for k in sites_keys[0]] for site in sites
]
correspondants_lines = [
[correspondant.to_dict().get(k, "") for k in correspondants_keys]
for correspondant in correspondants
]
for line in entreprises_lines:
cells = []
for it in line:
cells.append(ws1.make_cell(it))
ws1.append_row(cells)
for line in sites_lines:
cells = []
for it in line:
cells.append(ws2.make_cell(it))
ws2.append_row(cells)
for line in correspondants_lines:
cells = []
for it in line:
cells.append(ws3.make_cell(it))
ws3.append_row(cells)
return wb

View File

@ -272,7 +272,7 @@ class OffreCreationForm(FlaskForm):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.correspondant.choices = [ self.correspondant.choices = [("", "")] + [
(correspondant.id, f"{correspondant.nom} {correspondant.prenom}") (correspondant.id, f"{correspondant.nom} {correspondant.prenom}")
for correspondant in EntrepriseCorrespondant.query.filter_by( for correspondant in EntrepriseCorrespondant.query.filter_by(
entreprise_id=self.hidden_entreprise_id.data entreprise_id=self.hidden_entreprise_id.data
@ -318,7 +318,7 @@ class OffreModificationForm(FlaskForm):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.correspondant.choices = [ self.correspondant.choices = [("", "")] + [
(correspondant.id, f"{correspondant.nom} {correspondant.prenom}") (correspondant.id, f"{correspondant.nom} {correspondant.prenom}")
for correspondant in EntrepriseCorrespondant.query.filter_by( for correspondant in EntrepriseCorrespondant.query.filter_by(
entreprise_id=self.hidden_entreprise_id.data entreprise_id=self.hidden_entreprise_id.data

View File

@ -58,7 +58,7 @@ from app.scodoc import sco_etud, sco_excel
import app.scodoc.sco_utils as scu import app.scodoc.sco_utils as scu
from app import db from app import db
from sqlalchemy import text from sqlalchemy import text, sql
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
@ -665,7 +665,9 @@ def add_offre(id):
missions=form.missions.data.strip(), missions=form.missions.data.strip(),
duree=form.duree.data.strip(), duree=form.duree.data.strip(),
expiration_date=form.expiration_date.data, expiration_date=form.expiration_date.data,
correspondant_id=form.correspondant.data, correspondant_id=form.correspondant.data
if form.correspondant.data != ""
else None,
) )
db.session.add(offre) db.session.add(offre)
db.session.commit() db.session.commit()
@ -728,7 +730,10 @@ def edit_offre(id):
offre.missions = form.missions.data.strip() offre.missions = form.missions.data.strip()
offre.duree = form.duree.data.strip() offre.duree = form.duree.data.strip()
offre.expiration_date = form.expiration_date.data offre.expiration_date = form.expiration_date.data
offre.correspondant_id = form.correspondant.data if form.correspondant.data == "":
offre.correspondant_id = sql.null()
else:
offre.correspondant_id = form.correspondant.data
if offre_depts_list != form.depts.data: if offre_depts_list != form.depts.data:
for dept in form.depts.data: for dept in form.depts.data:
if dept not in offre_depts_list: if dept not in offre_depts_list:
@ -1415,13 +1420,45 @@ def import_donnees():
Config.SCODOC_VAR_DIR, "tmp", secure_filename(file.filename) Config.SCODOC_VAR_DIR, "tmp", secure_filename(file.filename)
) )
file.save(file_path) file.save(file_path)
data = sco_excel.excel_file_to_list(file_path) diag, lm = sco_excel.excel_file_to_list_are(file_path)
os.remove(file_path) os.remove(file_path)
print(data) if (
len(lm) < 3
or lm[0][0] != are.ENTREPRISES_KEYS
or lm[1][0] != are.SITES_KEYS[0] + are.SITES_KEYS[1]
or lm[2][0] != are.CORRESPONDANTS_KEYS
):
flash("Veuillez utilisez la feuille excel à remplir")
return redirect(url_for("entreprises.import_donnees"))
# en cours
entreprises_import = []
siret_list = []
ligne = 1
for entreprise_data in lm[0][1:]:
ligne += 1
if (
are.verif_entreprise_data(entreprise_data)
and entreprise_data[0].replace(" ", "") not in siret_list
):
siret_list.append(entreprise_data[0].replace(" ", ""))
entreprise = Entreprise(
siret=entreprise_data[0].replace(" ", ""),
nom=entreprise_data[1].strip(),
adresse=entreprise_data[2].strip(),
ville=entreprise_data[3].strip(),
codepostal=entreprise_data[4].strip(),
pays=entreprise_data[5].strip(),
visible=True,
)
entreprises_import.append(entreprise)
else:
flash(f"Erreur lors de l'importation")
return redirect(url_for("entreprises.import_donnees"))
return redirect(url_for("entreprises.import_donnees")) return redirect(url_for("entreprises.import_donnees"))
return render_template( return render_template(
"entreprises/import_entreprises.html", "entreprises/import_donnees.html",
title="Importation entreprises", title="Importation données",
form=form, form=form,
) )

View File

@ -613,6 +613,17 @@ def excel_file_to_list(filename):
) )
def excel_file_to_list_are(filename):
try:
return _excel_to_list_are(filename)
except:
raise ScoValueError(
"""Le fichier xlsx attendu n'est pas lisible !
Peut-être avez-vous fourni un fichier au mauvais format (txt, xls, ...)
"""
)
def _excel_to_list(filelike): def _excel_to_list(filelike):
"""returns list of list """returns list of list
convert_to_string is a conversion function applied to all non-string values (ie numbers) convert_to_string is a conversion function applied to all non-string values (ie numbers)
@ -673,6 +684,59 @@ def _excel_to_list(filelike):
return diag, m return diag, m
def _excel_to_list_are(filelike):
"""returns list of list
convert_to_string is a conversion function applied to all non-string values (ie numbers)
"""
try:
wb = load_workbook(filename=filelike, read_only=True, data_only=True)
except:
log("Excel_to_list: failure to import document")
with open("/tmp/last_scodoc_import_failure" + scu.XLSX_SUFFIX, "wb") as f:
f.write(filelike)
raise ScoValueError(
"Fichier illisible: assurez-vous qu'il s'agit bien d'un document Excel !"
)
diag = [] # liste de chaines pour former message d'erreur
if len(wb.get_sheet_names()) < 1:
diag.append("Aucune feuille trouvée dans le classeur !")
return diag, None
lm = []
for sheet_name in wb.get_sheet_names():
# fill matrix
ws = wb.get_sheet_by_name(sheet_name)
sheet_name = sheet_name.encode(scu.SCO_ENCODING, "backslashreplace")
values = {}
for row in ws.iter_rows():
for cell in row:
if cell.value is not None:
values[(cell.row - 1, cell.column - 1)] = str(cell.value)
if not values:
diag.append(
"Aucune valeur trouvée dans la feuille %s !"
% sheet_name.decode(scu.SCO_ENCODING)
)
return diag, None
indexes = list(values.keys())
# search numbers of rows and cols
rows = [x[0] for x in indexes]
cols = [x[1] for x in indexes]
nbcols = max(cols) + 1
nbrows = max(rows) + 1
m = []
for _ in range(nbrows):
m.append([""] * nbcols)
for row_idx, col_idx in indexes:
v = values[(row_idx, col_idx)]
m[row_idx][col_idx] = v
diag.append(
'Feuille "%s", %d lignes' % (sheet_name.decode(scu.SCO_ENCODING), len(m))
)
lm.append(m)
return diag, lm
def excel_feuille_listeappel( def excel_feuille_listeappel(
sem, sem,
groupname, groupname,

View File

@ -7,7 +7,7 @@
{% endblock %} {% endblock %}
{% block app_content %} {% block app_content %}
<h1>Importation entreprises</h1> <h1>{{ title }}</h1>
<br> <br>
<div> <div>
<a href="{{ url_for('entreprises.get_import_donnees_file_sample') }}">Obtenir la feuille excel à remplir</a> <a href="{{ url_for('entreprises.get_import_donnees_file_sample') }}">Obtenir la feuille excel à remplir</a>