"""AutoSco: ScoDoc API usage """ import os import requests from flask import current_app import config class APIError(Exception): "Error using ScoDoc API" def __init__(self, message: str = "", payload=None, status_code=None): self.message = message self.payload = payload or {} self.status_code = status_code def __str__(self): return f"APIError: {self.message} payload={self.payload} status_code={self.status_code}" class ScoDocAuthError(APIError): "Error getting ScoDoc token" def get_auth_headers(user: str, password: str, conf: config.Config) -> dict: "Demande de jeton, dict à utiliser dans les en-têtes de requêtes http" ans = requests.post( conf["API_URL"] + "/tokens", auth=(user, password), timeout=conf["API_TIMEOUT"], ) if ans.status_code != 200: raise ScoDocAuthError(f"Echec demande jeton par {user}", status_code=ans.status_code) token = ans.json()["token"] return {"Authorization": f"Bearer {token}"} class APIAccessor: "Gestion bas niveau des accès à l'API ScoDoc" def __init__(self, conf: config.Config, dept_acronym: str | None = None): self.config = conf self.dept_acronym = dept_acronym "si spécifié, utilisera API départementale" user = self.config["SCODOC_LOGIN"] password = self.config["SCODOC_PASSWORD"] self.headers = get_auth_headers(user, password, self.config) def get(self, path: str, headers: dict = None, errmsg=None, dept=None, raw=False): """Get. If raw, returns a requests.Response Else returns decoded json or, if other content, requests.Response Special case for non json result (image or pdf): return Content-Disposition string (inline or attachment) If raw, return """ dept = dept or self.dept_acronym if dept: url = self.config["SCODOC_URL"] + f"/ScoDoc/{dept}/api" + path else: url = self.config["API_URL"] + path reply = requests.get( url, headers=self.headers if headers is None else headers, verify=self.config["SCODOC_CHECK_CERTIFICATE"], timeout=self.config["API_TIMEOUT"], ) if reply.status_code != 200: print("url", url) print("reply", reply.text) try: payload = reply.json() except requests.exceptions.JSONDecodeError: payload = reply.text raise APIError( errmsg or f"""erreur get {url} !""", payload, status_code=reply.status_code, ) if (not raw) and reply.headers.get("Content-Type", None) == "application/json": return reply.json() # decode la reponse JSON return reply def post( self, path: str, data: dict = None, headers: dict = None, errmsg=None, dept=None, raw=False, ): """Post Decode réponse en json, sauf si raw. """ data = data or {} dept = dept or self.dept_acronym if dept: url = self.config["SCODOC_URL"] + f"/ScoDoc/{dept}/api" + path else: url = self.config["API_URL"] + path r = requests.post( url, json=data, headers=self.headers if headers is None else headers, verify=self.config["SCODOC_CHECK_CERTIFICATE"], timeout=self.config["API_TIMEOUT"], ) if r.status_code != 200: try: payload = r.json() except requests.exceptions.JSONDecodeError: payload = r.text raise APIError( errmsg or f"erreur url={url} status={r.status_code} !", payload=payload, status_code=r.status_code, ) if (not raw) and reply.headers.get("Content-Type", None) == "application/json": return r.json() # decode la reponse JSON return r def get(path, conf: config.Config = None): """Connect to ScoDoc and get. Utilise département configuré dans la config. """ conf = conf or (current_app.config if current_app else config.RunningConfig()) apicnx = APIAccessor(conf, dept_acronym=conf["SCODOC_DEPT_ACRONYM"]) return apicnx.get(path)