280 lines
11 KiB
Python
280 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os, re, math, unicodedata
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.dates as mdates
|
|
from matplotlib.gridspec import GridSpec
|
|
from matplotlib import rcParams
|
|
|
|
# --- SVG : garder du vrai texte dans les exports vectoriels ---
|
|
rcParams["svg.fonttype"] = "none"
|
|
|
|
# ========= Chemins =========
|
|
CHEMIN_SOURCES = r"data_valider_Pierre_toutes_dates.xlsx"
|
|
CHEMIN_DEBIT = r"débit Orniac et Figeac.xlsx"
|
|
DOSSIER_EXPORT = r"out"
|
|
os.makedirs(DOSSIER_EXPORT, exist_ok=True)
|
|
|
|
# ========= Paramètres à tracer =========
|
|
parametres = [
|
|
{"variable": "c25°C ", "ylabel": "Conductivité (μS/cm)"},
|
|
# {"variable": "pH", "ylabel": "pH"},
|
|
# {"variable": "T°C", "ylabel": "Température (°C)"},
|
|
# {"variable": "O2 mg/L", "ylabel": "O2 (mg/L)"},
|
|
# {"variable": "Alcalinité (mg/L)", "ylabel": "HCO3- (mg/L)"},
|
|
# {"variable": "Ca2+","ylabel": "Ca2+ (mg/L)"},
|
|
# {"variable": "Fluorure", "ylabel": "Fl- (mg/L)"},
|
|
# {"variable": "Cl-", "ylabel": "Cl- (mg/L)"},
|
|
# {"variable": "Mg2+", "ylabel": "Mg2+ (mg/L)"},
|
|
# {"variable": "K+", "ylabel": "K+ (mg/L)"},
|
|
# {"variable": "Na+", "ylabel": "Na+ (mg/L)"},
|
|
# {"variable": "Fe2+", "ylabel": "Fe2+ (mg/L)"},
|
|
# {"variable": "SO42-", "ylabel": "SO42- (mg/L)"},
|
|
# {"variable": "SiO2", "ylabel": "SiO2 (mg/L)"},
|
|
# {"variable": "PO43-", "ylabel": "PO43- (mg/L)"},
|
|
# {"variable": "NH4+", "ylabel": "NH4+ (mg/L)"},
|
|
# {"variable": "NO3-", "ylabel": "NO3- (mg/L)"},
|
|
# {"variable": "NO2-", "ylabel": "NO2- (mg/L)"},
|
|
# {"variable": "DOC", "ylabel": "DOC (mg/L)"},
|
|
# {"variable": "δ13C VPDB", "ylabel": "δ13C (‰ VPDB)"},
|
|
# {"variable": "D", "ylabel": "δ2H (‰ VSMOW)"},
|
|
# {"variable": "18O", "ylabel": "δ18O (‰ VSMOW)"},
|
|
]
|
|
|
|
# ========= Dates d’échantillonnage =========
|
|
DATES_MESURES = pd.to_datetime([
|
|
"24/03/2023","20/04/2023","25/05/2023","26/06/2023","01/08/2023","29/08/2023","03/10/2023",
|
|
"25/10/2023","16/11/2023","20/12/2023","29/01/2024","03/04/2024",
|
|
"29/04/2024","29/05/2024","25/06/2024","30/07/2024","28/08/2024"
|
|
], dayfirst=True)
|
|
|
|
# ========= Ordre des sources =========
|
|
ORDRE_SOURCES = ["Bullac","Corn","Bual","La Diège","Ayrissac","Pito",
|
|
"Ressel","Marchepied","Anglades","Liauzu","Pescalerie","Sagne"]
|
|
SOURCES_COL_G = ORDRE_SOURCES[:6]
|
|
SOURCES_COL_D = ORDRE_SOURCES[6:]
|
|
|
|
# ======================= OUTILS =======================
|
|
def normalise_txt(s: str) -> str:
|
|
s = "" if s is None else str(s)
|
|
s = unicodedata.normalize("NFKD", s)
|
|
s = "".join(ch for ch in s if not unicodedata.combining(ch))
|
|
s = s.lower()
|
|
s = re.sub(r"[^a-z0-9]+", "", s)
|
|
return s
|
|
|
|
def trouve_colonne(df: pd.DataFrame, souhait: str) -> str | None:
|
|
cible = normalise_txt(souhait)
|
|
mapping = {normalise_txt(c): c for c in df.columns}
|
|
if cible in mapping:
|
|
return mapping[cible]
|
|
# fallback: sous-chaîne
|
|
for k, c in mapping.items():
|
|
if cible and (cible in k or k in cible):
|
|
return c
|
|
return None
|
|
|
|
def trouve_colonne_date(df: pd.DataFrame) -> str | None:
|
|
# privilégie "Date et heure" ou "Date"
|
|
cands = [c for c in df.columns if re.search(r"date", str(c), flags=re.I)]
|
|
if not cands:
|
|
return None
|
|
for pref in ["Date et heure", "Date", "DATE"]:
|
|
if pref in df.columns:
|
|
return pref
|
|
return cands[0]
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
name = re.sub(r"[^\w\-. ]+", "_", name, flags=re.UNICODE)
|
|
name = name.strip().replace(" ", "_")
|
|
name = re.sub(r"_+", "_", name)
|
|
return name
|
|
|
|
def harmonize_y_limits(axes, padding=0.06):
|
|
"""Bornes Y identiques pour tous les axes passés."""
|
|
ymin, ymax = np.inf, -np.inf
|
|
for ax in axes:
|
|
for line in ax.get_lines():
|
|
y = np.asarray(line.get_ydata(), dtype=float)
|
|
y = y[np.isfinite(y)]
|
|
if y.size:
|
|
ymin = min(ymin, float(np.min(y)))
|
|
ymax = max(ymax, float(np.max(y)))
|
|
for coll in getattr(ax, "collections", []):
|
|
try:
|
|
offs = coll.get_offsets()
|
|
if len(offs):
|
|
y = np.asarray(offs[:, 1], dtype=float)
|
|
y = y[np.isfinite(y)]
|
|
if y.size:
|
|
ymin = min(ymin, float(np.min(y)))
|
|
ymax = max(ymax, float(np.max(y)))
|
|
except Exception:
|
|
pass
|
|
if not (np.isfinite(ymin) and np.isfinite(ymax)):
|
|
return
|
|
if ymax == ymin:
|
|
eps = abs(ymax) * 0.05 + 1e-9
|
|
ymin -= eps; ymax += eps
|
|
else:
|
|
pad = (ymax - ymin) * padding
|
|
ymin -= pad; ymax += pad
|
|
for ax in axes:
|
|
ax.set_ylim(ymin, ymax)
|
|
|
|
def nearest_points_for_dates(dfQ: pd.DataFrame, dates: pd.Series, tolerance="2D"):
|
|
"""Pour chaque date de DATES_MESURES, retourne la valeur de Q la plus proche (merge_asof)."""
|
|
s = dfQ[["date", "Q_Orniac"]].dropna().sort_values("date").copy()
|
|
target = pd.DataFrame({"date": pd.to_datetime(dates)})
|
|
pts = pd.merge_asof(target.sort_values("date"), s, on="date", tolerance=pd.Timedelta(tolerance), direction="nearest")
|
|
return pts.dropna()
|
|
|
|
# ======================= LECTURES =======================
|
|
def lecture_sources(path_excel: str) -> pd.DataFrame:
|
|
xls = pd.ExcelFile(path_excel)
|
|
# choisit la 1ère feuille plausible
|
|
selected = None
|
|
for sheet in xls.sheet_names:
|
|
tmp = xls.parse(sheet)
|
|
if trouve_colonne(tmp, "Nom") and trouve_colonne(tmp, "type") and trouve_colonne_date(tmp):
|
|
selected = tmp
|
|
break
|
|
if selected is None:
|
|
selected = xls.parse(xls.sheet_names[0])
|
|
df = selected.copy()
|
|
|
|
# standardise colonnes clés
|
|
col_nom = trouve_colonne(df, "Nom") or "Nom"
|
|
col_type = trouve_colonne(df, "type") or "type"
|
|
col_date = trouve_colonne_date(df) or df.columns[0]
|
|
|
|
df = df.rename(columns={col_nom: "Nom", col_type: "type", col_date: "Date"})
|
|
df["Date"] = pd.to_datetime(df["Date"], dayfirst=True, errors="coerce")
|
|
df = df.dropna(subset=["Date"])
|
|
return df
|
|
|
|
def lecture_debit(path_excel: str) -> pd.DataFrame:
|
|
df = pd.read_excel(path_excel, sheet_name=0)
|
|
col_date = trouve_colonne_date(df) or "Date et heure"
|
|
col_QO = (trouve_colonne(df, "Valeur à Orniac (en m³/s)") or
|
|
trouve_colonne(df, "Q Orniac") or
|
|
trouve_colonne(df, "Orniac"))
|
|
col_QF = (trouve_colonne(df, "Valeur à Figeac (en m³/s)") or
|
|
trouve_colonne(df, "Q Figeac") or
|
|
trouve_colonne(df, "Figeac"))
|
|
|
|
if col_QO is None:
|
|
raise RuntimeError("Colonne de débit d'Orniac introuvable dans le fichier des débits.")
|
|
|
|
keep = [c for c in [col_date, col_QO, col_QF] if c in df.columns]
|
|
df = df[keep].rename(columns={col_date: "date", col_QO: "Q_Orniac"})
|
|
if col_QF in df.columns:
|
|
df = df.rename(columns={col_QF: "Q_Figeac"})
|
|
df["date"] = pd.to_datetime(df["date"], errors="coerce")
|
|
df = df.dropna(subset=["date"]).sort_values("date")
|
|
return df
|
|
|
|
# ======================= TRACÉ =======================
|
|
def trace_parametre(df_src: pd.DataFrame, dfQ: pd.DataFrame, param: dict):
|
|
"""Crée et exporte une figure pour un paramètre."""
|
|
var = param["variable"]
|
|
ylabel = param["ylabel"]
|
|
|
|
# retrouver la colonne du paramètre
|
|
col_var = trouve_colonne(df_src, var)
|
|
if col_var is None:
|
|
print(f"[ATTENTION] Colonne introuvable pour '{var}'. Figure ignorée.")
|
|
return
|
|
|
|
# filtrer les 'sources' et l'ordre requis
|
|
data = df_src[df_src["type"].astype(str).str.lower().str.contains("source")].copy()
|
|
data = data[data["Nom"].isin(ORDRE_SOURCES)]
|
|
|
|
# préparation figure
|
|
n_sources = len(ORDRE_SOURCES) # 12
|
|
n_rows_sources = math.ceil(n_sources/2) # 6
|
|
plt.xticks(fontsize=50)
|
|
fig = plt.figure(figsize=(11, 16), constrained_layout=False)
|
|
gs = GridSpec(nrows=1 + n_rows_sources, ncols=2, figure=fig,
|
|
height_ratios=[1.25] + [1]*n_rows_sources,
|
|
hspace=0.35, wspace=0.28)
|
|
|
|
# ========== Débit (deux panneaux identiques, sans pluie) ==========
|
|
axQ_L = fig.add_subplot(gs[0, 0])
|
|
axQ_R = fig.add_subplot(gs[0, 1], sharex=axQ_L, sharey=axQ_L)
|
|
|
|
pts_mes = nearest_points_for_dates(dfQ, DATES_MESURES, tolerance="2D")
|
|
|
|
for axQ in (axQ_L, axQ_R):
|
|
axQ.plot(dfQ["date"], dfQ["Q_Orniac"], lw=1)
|
|
if not pts_mes.empty:
|
|
axQ.scatter(pts_mes["date"], pts_mes["Q_Orniac"], s=18, color="red", zorder=3)
|
|
axQ.set_ylabel("Débit (m³/s)")
|
|
axQ.set_title("Débit à Orniac")
|
|
for d in DATES_MESURES:
|
|
axQ.axvline(d, color="red", ls="--", lw=0.8, alpha=0.6)
|
|
axQ.grid(True, which="both", axis="both", alpha=0.2)
|
|
axQ.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
|
|
axQ.xaxis.set_major_formatter(mdates.DateFormatter("%m/%y"))
|
|
|
|
# ========== Sources (2 colonnes) ==========
|
|
axes_sources = []
|
|
|
|
def plot_source(ax, src_name: str):
|
|
df_s = data.loc[data["Nom"] == src_name, ["Date", col_var]].copy()
|
|
df_s = df_s.rename(columns={col_var: "val"}).dropna(subset=["Date", "val"]).sort_values("Date")
|
|
if df_s.empty:
|
|
ax.text(0.5, 0.5, "Aucune donnée", ha="center", va="center", transform=ax.transAxes)
|
|
else:
|
|
ax.plot(df_s["Date"], df_s["val"], marker="o", lw=1)
|
|
# annotation du nom à la fin de la courbe
|
|
try:
|
|
ax.text(df_s["Date"].iloc[-1], df_s["val"].iloc[-1], f" {src_name}",
|
|
va="center", fontsize=8)
|
|
except Exception:
|
|
pass
|
|
ax.set_ylabel(ylabel)
|
|
|
|
ax.set_title(f"{ylabel} - Source {src_name}")
|
|
for d in DATES_MESURES:
|
|
ax.axvline(d, color="red", ls="--", lw=0.6, alpha=0.4)
|
|
ax.grid(True, alpha=0.15)
|
|
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
|
|
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m/%y"))
|
|
|
|
|
|
# Colonne gauche
|
|
for i, src in enumerate(SOURCES_COL_G):
|
|
ax = fig.add_subplot(gs[1 + i, 0], sharex=axQ_L)
|
|
plot_source(ax, src)
|
|
axes_sources.append(ax)
|
|
|
|
# Colonne droite
|
|
for i, src in enumerate(SOURCES_COL_D):
|
|
ax = fig.add_subplot(gs[1 + i, 1], sharex=axQ_L)
|
|
plot_source(ax, src)
|
|
axes_sources.append(ax)
|
|
|
|
# ========== Harmonisation Y pour ce paramètre ==========
|
|
harmonize_y_limits(axes_sources, padding=0.06)
|
|
|
|
# ========== Sauvegarde ==========
|
|
nom_fichier = sanitize_filename(f"{ylabel}_sources_2col_debit_double.png")
|
|
chemin = os.path.join(DOSSIER_EXPORT, nom_fichier)
|
|
print("TEST")
|
|
plt.savefig(chemin, dpi=300, bbox_inches="tight")
|
|
plt.close(fig)
|
|
print(f"[OK] Export : {chemin}")
|
|
|
|
# ======================= MAIN =======================
|
|
def main():
|
|
print("[INFO] Lecture des données…")
|
|
df_src = lecture_sources(CHEMIN_SOURCES)
|
|
dfQ = lecture_debit(CHEMIN_DEBIT)
|
|
|
|
for p in parametres:
|
|
trace_parametre(df_src, dfQ, p)
|
|
|
|
if __name__ == "__main__":
|
|
main() |