MonScoDocEssai/tests/api/setup_test_api.py

119 lines
3.7 KiB
Python

# -*- coding: utf-8 -*-
"""Test API
Utilisation :
créer les variables d'environnement: (indiquer les valeurs
pour le serveur ScoDoc que vous voulez interroger)
export SCODOC_URL="https://scodoc.xxx.net/"
export API_USER="xxx"
export SCODOC_PASSWD="xxx"
export CHECK_CERTIFICATE=0 # ou 1 si serveur de production avec certif SSL valide
(on peut aussi placer ces valeurs dans un fichier .env du répertoire tests/api).
"""
import os
import requests
from dotenv import load_dotenv
import pytest
from app.scodoc import sco_utils as scu
# --- Lecture configuration (variables d'env ou .env)
try:
BASEDIR = os.path.abspath(os.path.dirname(__file__))
except NameError:
BASEDIR = "/opt/scodoc/tests/api"
load_dotenv(os.path.join(BASEDIR, ".env"))
CHECK_CERTIFICATE = bool(os.environ.get("CHECK_CERTIFICATE", False))
SCODOC_URL = os.environ["SCODOC_URL"] or "http://localhost:5000"
API_URL = SCODOC_URL + "/ScoDoc/api"
API_USER = os.environ.get("API_USER", "test")
API_PASSWORD = os.environ.get("API_PASSWORD", os.environ.get("API_PASSWD", "test"))
API_USER_ADMIN = os.environ.get("API_USER_ADMIN", "admin_api")
API_PASSWORD_ADMIN = os.environ.get("API_PASSWD_ADMIN", "admin_api")
DEPT_ACRONYM = "TAPI"
print(f"SCODOC_URL={SCODOC_URL}")
print(f"API URL={API_URL}")
class APIError(Exception):
def __init__(self, message: str = "", payload=None):
self.message = message
self.payload = payload or {}
def get_auth_headers(user, password) -> dict:
"Demande de jeton, dict à utiliser dans les en-têtes de requêtes http"
ans = requests.post(API_URL + "/tokens", auth=(user, password), timeout=5)
if ans.status_code != 200:
raise APIError(f"Echec demande jeton par {user}")
token = ans.json()["token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def api_headers() -> dict:
"""Jeton, utilisateur API ordinaire"""
return get_auth_headers(API_USER, API_PASSWORD)
@pytest.fixture
def api_admin_headers() -> dict:
"""Jeton, utilisateur API SuperAdmin"""
return get_auth_headers(API_USER_ADMIN, API_PASSWORD_ADMIN)
def GET(path: str, headers: dict = None, errmsg=None, dept=None):
"""Get and returns as JSON
Special case for non json result (image or pdf):
return Content-Disposition string (inline or attachment)
"""
if dept:
url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path
else:
url = API_URL + path
reply = requests.get(
url,
headers=headers or {},
verify=CHECK_CERTIFICATE,
timeout=scu.SCO_TEST_API_TIMEOUT,
)
if reply.status_code != 200:
raise APIError(
errmsg or f"""erreur status={reply.status_code} !""", reply.json()
)
if reply.headers.get("Content-Type", None) == "application/json":
return reply.json() # decode la reponse JSON
elif reply.headers.get("Content-Type", None) in [
"image/jpg",
"image/png",
"application/pdf",
]:
retval = {
"Content-Type": reply.headers.get("Content-Type", None),
"Content-Disposition": reply.headers.get("Content-Disposition", None),
}
return retval
raise APIError("Unknown returned content {r.headers.get('Content-Type', None} !\n")
def POST_JSON(path: str, data: dict = {}, headers: dict = None, errmsg=None, dept=None):
"""Post"""
if dept:
url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path
else:
url = API_URL + path
r = requests.post(
url,
json=data,
headers=headers or {},
verify=CHECK_CERTIFICATE,
timeout=10,
)
if r.status_code != 200:
raise APIError(errmsg or f"erreur status={r.status_code} !", r.json())
return r.json() # decode la reponse JSON