189 lines
6.0 KiB
Python
189 lines
6.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""Test API
|
|
|
|
Utilisation :
|
|
créer les variables d'environnement: (indiquer les valeurs
|
|
pour le serveur ScoDoc que vous voulez interroger)
|
|
|
|
export SCODOC_URL="https://scodoc.xxx.net/"
|
|
export API_USER="xxx"
|
|
export SCODOC_PASSWD="xxx"
|
|
export CHECK_CERTIFICATE=0 # ou 1 si serveur de production avec certif SSL valide
|
|
|
|
(on peut aussi placer ces valeurs dans un fichier .env du répertoire tests/api).
|
|
"""
|
|
import os
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
import pytest
|
|
|
|
# --- Lecture configuration (variables d'env ou .env)
|
|
try:
|
|
BASEDIR = os.path.abspath(os.path.dirname(__file__))
|
|
except NameError:
|
|
BASEDIR = "/opt/scodoc/tests/api"
|
|
|
|
load_dotenv(os.path.join(BASEDIR, ".env"))
|
|
CHECK_CERTIFICATE = bool(os.environ.get("CHECK_CERTIFICATE", False))
|
|
SCODOC_URL = os.environ["SCODOC_URL"] or "http://localhost:5000"
|
|
API_URL = SCODOC_URL + "/ScoDoc/api"
|
|
API_USER = os.environ.get("API_USER", "test")
|
|
API_PASSWORD = os.environ.get("API_PASSWORD", os.environ.get("API_PASSWD", "test"))
|
|
API_USER_ADMIN = os.environ.get("API_USER_ADMIN", "admin_api")
|
|
API_PASSWORD_ADMIN = os.environ.get("API_PASSWD_ADMIN", "admin_api")
|
|
DEPT_ACRONYM = "TAPI"
|
|
SCO_TEST_API_TIMEOUT = 5
|
|
print(f"SCODOC_URL={SCODOC_URL}")
|
|
print(f"API URL={API_URL}")
|
|
|
|
|
|
class APIError(Exception):
|
|
def __init__(self, message: str = "", payload=None):
|
|
self.message = message
|
|
self.payload = payload or {}
|
|
|
|
|
|
def get_auth_headers(user, password) -> dict:
|
|
"Demande de jeton, dict à utiliser dans les en-têtes de requêtes http"
|
|
ans = requests.post(API_URL + "/tokens", auth=(user, password), timeout=5)
|
|
if ans.status_code != 200:
|
|
raise APIError(f"Echec demande jeton par {user}")
|
|
token = ans.json()["token"]
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
def api_headers() -> dict:
|
|
"""Jeton, utilisateur API ordinaire"""
|
|
return get_auth_headers(API_USER, API_PASSWORD)
|
|
|
|
|
|
@pytest.fixture
|
|
def api_admin_headers() -> dict:
|
|
"""Jeton, utilisateur API SuperAdmin"""
|
|
return get_auth_headers(API_USER_ADMIN, API_PASSWORD_ADMIN)
|
|
|
|
|
|
def GET(path: str, headers: dict = None, errmsg=None, dept=None):
|
|
"""Get and returns as JSON
|
|
Special case for non json result (image or pdf):
|
|
return Content-Disposition string (inline or attachment)
|
|
"""
|
|
if dept:
|
|
url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path
|
|
else:
|
|
url = API_URL + path
|
|
reply = requests.get(
|
|
url,
|
|
headers=headers or {},
|
|
verify=CHECK_CERTIFICATE,
|
|
timeout=SCO_TEST_API_TIMEOUT,
|
|
)
|
|
if reply.status_code != 200:
|
|
raise APIError(
|
|
errmsg or f"""erreur status={reply.status_code} !""", reply.json()
|
|
)
|
|
|
|
if reply.headers.get("Content-Type", None) == "application/json":
|
|
return reply.json() # decode la reponse JSON
|
|
elif reply.headers.get("Content-Type", None) in [
|
|
"image/jpg",
|
|
"image/png",
|
|
"application/pdf",
|
|
]:
|
|
retval = {
|
|
"Content-Type": reply.headers.get("Content-Type", None),
|
|
"Content-Disposition": reply.headers.get("Content-Disposition", None),
|
|
}
|
|
return retval
|
|
raise APIError("Unknown returned content {r.headers.get('Content-Type', None} !\n")
|
|
|
|
|
|
def POST_JSON(path: str, data: dict = {}, headers: dict = None, errmsg=None, dept=None):
|
|
"""Post"""
|
|
if dept:
|
|
url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path
|
|
else:
|
|
url = API_URL + path
|
|
r = requests.post(
|
|
url,
|
|
json=data,
|
|
headers=headers or {},
|
|
verify=CHECK_CERTIFICATE,
|
|
timeout=SCO_TEST_API_TIMEOUT,
|
|
)
|
|
if r.status_code != 200:
|
|
raise APIError(errmsg or f"erreur status={r.status_code} !", r.json())
|
|
return r.json() # decode la reponse JSON
|
|
|
|
|
|
def check_fields(data: dict, fields: dict = None):
|
|
"""
|
|
Vérifie que le dictionnaire data contient les bonnes clés
|
|
et les bons types de valeurs.
|
|
|
|
Args:
|
|
data (dict): un dictionnaire (json de retour de l'api)
|
|
fields (dict, optional): Un dictionnaire représentant les clés et les types d'une réponse.
|
|
"""
|
|
assert set(data.keys()) == set(fields.keys())
|
|
for key in data:
|
|
if key in ("moduleimpl_id", "desc", "user_id", "external_data"):
|
|
assert (
|
|
isinstance(data[key], fields[key]) or data[key] is None
|
|
), f"error [{key}:{type(data[key])}, {data[key]}, {fields[key]}]"
|
|
else:
|
|
assert isinstance(
|
|
data[key], fields[key]
|
|
), f"error [{key}:{type(data[key])}, {data[key]}, {fields[key]}]"
|
|
|
|
|
|
def check_failure_get(path: str, headers: dict, err: str = None):
|
|
"""
|
|
Vérifie que la requête GET renvoie bien un 404
|
|
|
|
Args:
|
|
path (str): la route de l'api
|
|
headers (dict): le token d'auth de l'api
|
|
err (str, optional): L'erreur qui est sensée être fournie par l'api.
|
|
|
|
Raises:
|
|
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
|
|
"""
|
|
|
|
try:
|
|
GET(path=path, headers=headers)
|
|
# ^ Renvoi un 404
|
|
except APIError as api_err:
|
|
if err is not None:
|
|
assert api_err.payload["message"] == err
|
|
else:
|
|
raise APIError("Le GET n'aurait pas du fonctionner")
|
|
|
|
|
|
def check_failure_post(path: str, headers: dict, data: dict, err: str = None):
|
|
"""
|
|
Vérifie que la requête POST renvoie bien un 404
|
|
|
|
Args:
|
|
path (str): la route de l'api
|
|
headers (dict): le token d'auth
|
|
data (dict): un dictionnaire (json) à envoyer
|
|
err (str, optional): L'erreur qui est sensée être fournie par l'api.
|
|
|
|
Raises:
|
|
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
|
|
"""
|
|
|
|
try:
|
|
data = POST_JSON(path=path, headers=headers, data=data)
|
|
# ^ Renvoie un 404
|
|
except APIError as api_err:
|
|
if err is not None:
|
|
assert (
|
|
api_err.payload["message"] == err
|
|
), f"received: {api_err.payload['message']}"
|
|
else:
|
|
raise APIError("Le GET n'aurait pas du fonctionner")
|