ScoDoc/tests/api/setup_test_api.py

252 lines
7.7 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""Test API
Utilisation :
créer les variables d'environnement: (indiquer les valeurs
pour le serveur ScoDoc que vous voulez interroger)
export SCODOC_URL="https://scodoc.xxx.net/"
export API_USER="xxx"
export SCODOC_PASSWD="xxx"
export CHECK_CERTIFICATE=0 # ou 1 si serveur de production avec certif SSL valide
(on peut aussi placer ces valeurs dans un fichier .env du répertoire tests/api).
"""
import os
import requests
2024-01-22 09:57:41 +01:00
try:
from dotenv import load_dotenv
except ModuleNotFoundError:
print("\nWarning: dotenv not installed, ignoring .env")
print("You may install it using:\npip install python-dotenv\n")
load_dotenv = None
try:
import pytest
except ModuleNotFoundError:
print("pytest not installed\n")
pytest = None
2022-08-10 10:49:58 +02:00
# --- Lecture configuration (variables d'env ou .env)
try:
BASEDIR = os.path.abspath(os.path.dirname(__file__))
except NameError:
BASEDIR = "/opt/scodoc/tests/api"
2024-01-22 09:57:41 +01:00
if load_dotenv:
load_dotenv(os.path.join(BASEDIR, ".env"))
CHECK_CERTIFICATE = bool(os.environ.get("CHECK_CERTIFICATE", False))
2024-01-22 09:57:41 +01:00
SCODOC_URL = os.environ.get("SCODOC_URL") or "http://localhost:5000"
API_URL = SCODOC_URL + "/ScoDoc/api"
API_USER = os.environ.get("API_USER", "test")
API_PASSWORD = os.environ.get("API_PASSWORD", os.environ.get("API_PASSWD", "test"))
API_USER_ADMIN = os.environ.get("API_USER_ADMIN", "admin_api")
API_PASSWORD_ADMIN = os.environ.get("API_PASSWD_ADMIN", "admin_api")
DEPT_ACRONYM = "TAPI"
API_DEPT_URL = f"{SCODOC_URL}/ScoDoc/{DEPT_ACRONYM}/api"
SCO_TEST_API_TIMEOUT = 5
print(f"SCODOC_URL={SCODOC_URL}")
print(f"API URL={API_URL}")
2024-01-22 09:57:41 +01:00
print(f"API_USER={API_USER}")
2022-08-08 10:34:13 +02:00
class APIError(Exception):
2024-08-14 10:47:48 +02:00
def __init__(self, message: str = "", payload=None, status_code=None):
2022-08-17 18:15:48 +02:00
self.message = message
self.payload = payload or {}
2024-08-14 10:47:48 +02:00
self.status_code = status_code
2022-08-08 10:34:13 +02:00
def __str__(self):
2024-08-14 10:47:48 +02:00
return f"APIError: {self.message} payload={self.payload} status_code={self.status_code}"
2022-08-08 10:34:13 +02:00
def get_auth_headers(user, password) -> dict:
"Demande de jeton, dict à utiliser dans les en-têtes de requêtes http"
ans = requests.post(API_URL + "/tokens", auth=(user, password), timeout=5)
2022-08-08 10:34:13 +02:00
if ans.status_code != 200:
2024-08-14 10:47:48 +02:00
raise APIError(f"Echec demande jeton par {user}", status_code=ans.status_code)
2022-08-08 10:34:13 +02:00
token = ans.json()["token"]
return {"Authorization": f"Bearer {token}"}
2024-01-22 09:57:41 +01:00
if pytest:
2024-01-22 09:57:41 +01:00
@pytest.fixture
def api_headers() -> dict:
"""Jeton, utilisateur API ordinaire"""
return get_auth_headers(API_USER, API_PASSWORD)
2024-01-22 09:57:41 +01:00
@pytest.fixture
def api_admin_headers() -> dict:
"""Jeton, utilisateur API SuperAdmin"""
return get_auth_headers(API_USER_ADMIN, API_PASSWORD_ADMIN)
class _DefaultHeaders:
headers = {}
def set_headers(headers: dict):
"""Set default headers"""
print(f"set_headers: {headers}")
_DefaultHeaders.headers = headers
def GET(
path: str, headers: dict = None, errmsg=None, dept: str | None = None, raw=False
):
"""Get and optionaly returns as JSON
2023-03-09 14:24:12 +01:00
Special case for non json result (image or pdf):
return Content-Disposition string (inline or attachment)
If raw, return a requests.Response
"""
if dept:
url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path
else:
url = API_URL + path
reply = requests.get(
url,
headers=_DefaultHeaders.headers if headers is None else headers,
verify=CHECK_CERTIFICATE,
timeout=SCO_TEST_API_TIMEOUT,
)
2023-03-09 14:24:12 +01:00
if reply.status_code != 200:
print("url", url)
print("reply", reply.text)
2023-03-09 14:24:12 +01:00
raise APIError(
2024-08-14 10:47:48 +02:00
errmsg or f"""erreur get {url} !""",
reply.json(),
status_code=reply.status_code,
2023-03-09 14:24:12 +01:00
)
if raw:
return reply
2023-03-09 14:24:12 +01:00
if reply.headers.get("Content-Type", None) == "application/json":
return reply.json() # decode la reponse JSON
if reply.headers.get("Content-Type", None) in [
"application/pdf",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"image/gif",
"image/jpeg",
"image/png",
"image/webp",
]:
retval = {
2023-03-09 14:24:12 +01:00
"Content-Type": reply.headers.get("Content-Type", None),
"Content-Disposition": reply.headers.get("Content-Disposition", None),
}
return retval
2024-08-14 10:47:48 +02:00
raise APIError(
f"Unknown returned content {reply.headers.get('Content-Type', None)} !\n",
2024-08-14 10:47:48 +02:00
status_code=reply.status_code,
)
def POST(
path: str,
data: dict = None,
headers: dict = None,
errmsg=None,
dept: str | None = None,
raw=False,
):
"""Post
Decode réponse en json, sauf si raw.
"""
data = data or {}
if dept:
url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path
else:
url = API_URL + path
r = requests.post(
url,
json=data,
headers=_DefaultHeaders.headers if headers is None else headers,
verify=CHECK_CERTIFICATE,
timeout=SCO_TEST_API_TIMEOUT,
)
if r.status_code != 200:
try:
payload = r.json()
except requests.exceptions.JSONDecodeError:
payload = r.text
raise APIError(
2024-08-14 10:47:48 +02:00
errmsg or f"erreur url={url} status={r.status_code} !",
payload=payload,
status_code=r.status_code,
)
return r if raw else r.json() # decode la reponse JSON
2023-08-25 17:58:57 +02:00
def check_fields(data: dict, fields: dict = None):
"""
Vérifie que le dictionnaire data contient les bonnes clés
et les bons types de valeurs.
Args:
data (dict): un dictionnaire (json de retour de l'api)
fields (dict, optional): Un dictionnaire représentant les clés et les types d'une réponse.
"""
assert set(data.keys()) == set(fields.keys())
for key in data:
2023-09-27 23:14:45 +02:00
if key in ("moduleimpl_id", "desc", "external_data"):
2023-08-25 17:58:57 +02:00
assert (
isinstance(data[key], fields[key]) or data[key] is None
), f"error [{key}:{type(data[key])}, {data[key]}, {fields[key]}]"
else:
assert isinstance(
data[key], fields[key]
), f"error [{key}:{type(data[key])}, {data[key]}, {fields[key]}]"
def check_failure_get(path: str, headers: dict, err: str = None):
"""
Vérifie que la requête GET renvoie bien un 404
Args:
path (str): la route de l'api
headers (dict): le token d'auth de l'api
err (str, optional): L'erreur qui est sensée être fournie par l'api.
Raises:
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
"""
try:
GET(path=path, headers=headers, dept=DEPT_ACRONYM)
2023-08-25 17:58:57 +02:00
# ^ Renvoi un 404
except APIError as api_err:
if err is not None:
assert api_err.payload["message"] == err
else:
raise APIError("Le GET n'aurait pas du fonctionner")
def check_failure_post(path: str, headers: dict, data: dict, err: str = None):
"""
Vérifie que la requête POST renvoie bien un 404
Args:
path (str): la route de l'api
headers (dict): le token d'auth
data (dict): un dictionnaire (json) à envoyer
err (str, optional): L'erreur qui est sensée être fournie par l'api.
Raises:
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
"""
try:
data = POST(path=path, headers=headers, data=data, dept=DEPT_ACRONYM)
2023-08-25 17:58:57 +02:00
# ^ Renvoie un 404
except APIError as api_err:
if err is not None:
assert (
api_err.payload["message"] == err
), f"received: {api_err.payload['message']}"
else:
raise APIError("Le GET n'aurait pas du fonctionner")