From f02cfbeec0ade9ebb8776277cd19da3a0f5d7efb Mon Sep 17 00:00:00 2001 From: mayge Date: Thu, 2 Oct 2025 04:35:41 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 - test/gvf_plot_poles.py | 7 + test/gvf_worst_rms_error.py | 97 ++++++++++++++ test/hungarian_match.py | 203 +++++++++++++++++++++++++++++ test/import_mlin_export_y.py | 6 + test/logger.py | 166 +++++++++++++++++++++++ test/sweep.py | 5 + test/test_save_and_reload_model.py | 48 +++++++ 8 files changed, 532 insertions(+), 1 deletion(-) create mode 100644 test/gvf_plot_poles.py create mode 100644 test/gvf_worst_rms_error.py create mode 100644 test/hungarian_match.py create mode 100644 test/import_mlin_export_y.py create mode 100644 test/logger.py create mode 100644 test/sweep.py create mode 100644 test/test_save_and_reload_model.py diff --git a/.gitignore b/.gitignore index 678a18a..3d4dd95 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,6 @@ env/ .venv/ .vscode/ .cache/ -test/ outputs/ ovf.egg-info/ dist/ diff --git a/test/gvf_plot_poles.py b/test/gvf_plot_poles.py new file mode 100644 index 0000000..f934909 --- /dev/null +++ b/test/gvf_plot_poles.py @@ -0,0 +1,7 @@ +from ovf.core.GVFManager import GVFManager + +gvf = GVFManager.load("outputs/mtee_gvf.pkl") +# gvf = GVFManager() +# gvf.load_from_datasets("examples/data/capa.json",npoles_cplx=2,max_points=20) +# gvf.save("outputs/capa_gvf.pkl") +gvf.plot_poles("outputs/mtee_poles",degree=3,geometry_1="L1",geometry_2="L2") \ No newline at end of file diff --git a/test/gvf_worst_rms_error.py b/test/gvf_worst_rms_error.py new file mode 100644 index 0000000..a40a7f4 --- /dev/null +++ b/test/gvf_worst_rms_error.py @@ -0,0 +1,97 @@ +from ovf.core.GVFManager import GVFManager +import numpy as np +import logging + +def get_logger(): + logger = logging.getLogger("gvf_worst_rms_error") + logger.setLevel(logging.INFO) + if not logger.hasHandlers(): + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + logger.addHandler(ch) + return logger + +logger = get_logger() + +gvf = GVFManager() +gvf.load_from_datasets("examples/data/mlin.json",npoles_cplx=2,max_points=20,parameter_type="s",min_freqs=10e9,max_freqs=200e9) +gvf.save("outputs/mlin_gvf_2pole_s.pkl") + +# gvf = GVFManager.load("outputs/mlin_gvf_2pole_s.pkl") + +max_rms_error_info = { + "max_rms_error": 0.0, + "max_rms_error_l1": 0.0, + "max_rms_error_lmax": 0.0, + "max_rms_error_geom": None, + "max_rms_index": 0 +} + +max_rms_error_l1_info = { + "max_rms_error": 0.0, + "max_rms_error_l1": 0.0, + "max_rms_error_lmax": 0.0, + "max_rms_error_geom": None, + "max_rms_index": 0 +} + +max_rms_error_lmax_info = { + "max_rms_error": 0.0, + "max_rms_error_l1": 0.0, + "max_rms_error_lmax": 0.0, + "max_rms_error_geom": None, + "max_rms_index": 0 +} + + +for ds in gvf.datasets: + rmse = ds.vf_manager.eigenval_rms_error[-1] + rmse_l1 = ds.vf_manager.eigenval_rms_error_l1[-1] + rmse_lmax = ds.vf_manager.eigenval_rms_error_lmax[-1] + # logger.info(f"Dataset: {ds.geometries}, RMS Error: {rmse}, RMS Error L1: {rmse_l1}, RMS Error Lmax: {rmse_lmax}") + logger.info( + f'W={ds.geometries["W"]},L={ds.geometries["L"]},rms={rmse},rms_l1={rmse_l1},rms_lmax={rmse_lmax},id={ds.id}', + ) + + if rmse > max_rms_error_info["max_rms_error"]: + max_rms_error_info["max_rms_error"] = rmse + max_rms_error_info["max_rms_error_l1"] = rmse_l1 + max_rms_error_info["max_rms_error_lmax"] = rmse_lmax + max_rms_error_info["max_rms_error_geom"] = ds.geometries + max_rms_error_info["max_rms_index"] = ds.id + + if rmse_l1 > max_rms_error_l1_info["max_rms_error_l1"]: + max_rms_error_l1_info["max_rms_error"] = rmse + max_rms_error_l1_info["max_rms_error_l1"] = rmse_l1 + max_rms_error_l1_info["max_rms_error_lmax"] = rmse_lmax + max_rms_error_l1_info["max_rms_error_geom"] = ds.geometries + max_rms_error_l1_info["max_rms_index"] = ds.id + + if rmse_lmax > max_rms_error_lmax_info["max_rms_error_lmax"]: + max_rms_error_lmax_info["max_rms_error"] = rmse + max_rms_error_lmax_info["max_rms_error_l1"] = rmse_l1 + max_rms_error_lmax_info["max_rms_error_lmax"] = rmse_lmax + max_rms_error_lmax_info["max_rms_error_geom"] = ds.geometries + max_rms_error_lmax_info["max_rms_index"] = ds.id + +print(f"Maximum RMS error across datasets: {max_rms_error_info}") +print(f"Geometry with maximum RMS error: {max_rms_error_info['max_rms_error_geom']}") +print(f"Index of dataset with maximum RMS error: {max_rms_error_info['max_rms_index']}") + +print(f"Maximum RMS L1 error across datasets: {max_rms_error_l1_info}") +print(f"Geometry with maximum RMS L1 error: {max_rms_error_l1_info['max_rms_error_geom']}") +print(f"Index of dataset with maximum RMS L1 error: {max_rms_error_l1_info['max_rms_index']}") + +print(f"Maximum RMS Lmax error across datasets: {max_rms_error_lmax_info}") +print(f"Geometry with maximum RMS Lmax error: {max_rms_error_lmax_info['max_rms_error_geom']}") +print(f"Index of dataset with maximum RMS Lmax error: {max_rms_error_lmax_info['max_rms_index']}") + +gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses",int(max_rms_error_info['max_rms_index'])) +gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses_l1",int(max_rms_error_l1_info['max_rms_index'])) +gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses_lmax",int(max_rms_error_lmax_info['max_rms_index'])) + +gvf.plot_poles_in_2d("outputs/mlin_gvf_2pole_s_poles.html") +gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses",1399) + diff --git a/test/hungarian_match.py b/test/hungarian_match.py new file mode 100644 index 0000000..4d48478 --- /dev/null +++ b/test/hungarian_match.py @@ -0,0 +1,203 @@ +import numpy as np +from scipy.optimize import linear_sum_assignment + +def pairwise_mac(Phi: np.ndarray, Psi: np.ndarray) -> np.ndarray: + """ + Pairwise MAC between mode shape sets Phi (d x m) and Psi (d x n). + Columns are mode shapes. Works for real or complex. + Returns an (m x n) array of MAC values in [0,1]. + """ + # Normalize columns + Phi_n = Phi / np.linalg.norm(Phi, axis=0, keepdims=True) + Psi_n = Psi / np.linalg.norm(Psi, axis=0, keepdims=True) + G = Phi_n.conj().T @ Psi_n + return np.abs(G) ** 2 + +def eig_to_freq_damp_from_s(lam: np.ndarray): + """ + Continuous-time eigenvalues s = sigma + j*omega -> (f, zeta). + f in Hz, zeta in [0,1]. + """ + sigma = lam.real + omega = np.abs(lam.imag) + f = omega / (2 * np.pi) + zeta = -sigma / np.sqrt(sigma**2 + omega**2 + 1e-18) + return f, zeta + +def z_to_s(z: np.ndarray, dt: float) -> np.ndarray: + """ Map discrete-time eigenvalues z to continuous-time s via log(z)/dt. """ + return np.log(z) / dt + +def build_cost_matrix( + f1, z1, Phi1, f2, z2, Phi2, + w_f=1.0, w_z=0.5, w_mac=1.0, f_scale=None, + gate_df=0.20, gate_z=0.05, gate_mac=0.7, + cost_unassigned=2.0, big_M=1e6 +): + """ + Construct a padded square cost matrix for Hungarian assignment with gating and dummies. + + Parameters: + - f1, z1: (m,) arrays for set A (reference) + - Phi1: (d, m) mode shapes for set A + - f2, z2: (n,) arrays for set B (to match) + - Phi2: (d, n) mode shapes for set B + - w_f, w_z, w_mac: weights + - f_scale: normalization for frequency difference (default: max of both sets or 1.0) + - gate_df: max allowed normalized freq diff (Hz normalized) for feasible match + - gate_z: max allowed damping diff for feasible match + - gate_mac: min required MAC for feasible match + - cost_unassigned: penalty to leave a mode unmatched (tune to your problem) + - big_M: prohibitive cost for infeasible pairs + + Returns: + C_pad: (L, L) padded square cost matrix + sizes: dict with m, n, L + """ + f1 = np.asarray(f1).reshape(-1) + z1 = np.asarray(z1).reshape(-1) + f2 = np.asarray(f2).reshape(-1) + z2 = np.asarray(z2).reshape(-1) + + m = f1.size + n = f2.size + + if f_scale is None: + f_scale = max(np.max(f1) if m else 1.0, np.max(f2) if n else 1.0, 1.0) + + # Pairwise components + DF = np.abs(f1[:, None] - f2[None, :]) / f_scale # (m,n) + DZ = np.abs(z1[:, None] - z2[None, :]) # (m,n) + MAC = pairwise_mac(Phi1, Phi2) if (Phi1.size and Phi2.size) else np.zeros((m, n)) + one_minus_MAC = 1.0 - MAC + + # Base cost + C = w_f * DF + w_z * DZ + w_mac * one_minus_MAC + + # Gating + infeasible = (DF > gate_df) | (DZ > gate_z) | (MAC < gate_mac) + C = np.where(infeasible, big_M, C) + + # Pad to square with dummy rows/cols so that unmatched is allowed + L = max(m, n) + if L == 0: + return np.zeros((0, 0)), {"m": m, "n": n, "L": L} + + C_pad = np.full((L, L), cost_unassigned, dtype=float) + + # Place real costs into top-left block + if m and n: + C_pad[:m, :n] = C + + # For the padded parts (dummies), keep cost_unassigned (meaning: matching to dummy costs that penalty) + + return C_pad, {"m": m, "n": n, "L": L} + +def hungarian_match( + f1, z1, Phi1, f2, z2, Phi2, + w_f=1.0, w_z=0.5, w_mac=1.0, f_scale=None, + gate_df=0.20, gate_z=0.05, gate_mac=0.7, + cost_unassigned=2.0, big_M=1e6, + accept_cost_threshold=None +): + """ + Run Hungarian assignment and return matches and unmatched lists. + + Returns: + matches: list of (i, j, cost) with i in [0,m) and j in [0,n) + unmatched_A: list of indices in A not matched to any real B (or rejected by accept threshold) + unmatched_B: list of indices in B not matched to any real A (or rejected by accept threshold) + C_pad: the square padded cost matrix actually optimized + """ + C_pad, sizes = build_cost_matrix( + f1, z1, Phi1, f2, z2, Phi2, + w_f, w_z, w_mac, f_scale, + gate_df, gate_z, gate_mac, + cost_unassigned, big_M + ) + m, n, L = sizes["m"], sizes["n"], sizes["L"] + if L == 0: + return [], [], [], C_pad + + row_ind, col_ind = linear_sum_assignment(C_pad) + + matches = [] + unmatched_A = [] + unmatched_B = [] + + # Interpret the assignment on the unpadded indices + for r, c in zip(row_ind, col_ind): + # Case 1: real-to-real + if r < m and c < n: + cost = C_pad[r, c] + if accept_cost_threshold is not None and cost > accept_cost_threshold: + unmatched_A.append(r) + unmatched_B.append(c) + elif cost >= big_M: + # Infeasible pair forced by padding; treat as unmatched + unmatched_A.append(r) + unmatched_B.append(c) + else: + matches.append((r, c, float(cost))) + # Case 2: A matched to a dummy column => A unmatched + elif r < m and c >= n: + unmatched_A.append(r) + # Case 3: dummy row matched to real B => B unmatched + elif r >= m and c < n: + unmatched_B.append(c) + # Case 4: dummy-dummy (should not affect) + else: + pass + + # Deduplicate unmatched indices in case of multiple reasons + unmatched_A = sorted(set([i for i in unmatched_A if 0 <= i < m])) + unmatched_B = sorted(set([j for j in unmatched_B if 0 <= j < n])) + + return matches, unmatched_A, unmatched_B, C_pad + +def demo(): + # Synthetic example: 3 reference modes vs 4 estimated modes + rng = np.random.default_rng(42) + d = 3 # DOF of mode shapes + + # Reference modes (A) + fA = np.array([1.60, 3.00, 7.50]) # Hz + zA = np.array([0.010, 0.020, 0.005]) # damping ratios + PhiA = np.array([ + [1.0, 0.1, 0.2], + [0.2, 1.0, 0.1], + [0.1, 0.2, 1.0], + ], dtype=float) + PhiA = PhiA / np.linalg.norm(PhiA, axis=0, keepdims=True) + + # Estimated modes (B): 3 near matches + 1 spurious + fB = np.array([1.62, 7.48, 2.95, 10.0]) + zB = np.array([0.011, 0.006, 0.022, 0.010]) + # Perturb PhiA columns to create similar shapes (plus one unrelated) + Psi_sim = PhiA.copy() + Psi_sim += 0.03 * rng.standard_normal(Psi_sim.shape) + Psi_sim = Psi_sim / np.linalg.norm(Psi_sim, axis=0, keepdims=True) + Psi_spur = rng.standard_normal((d, 1)) + Psi_spur = Psi_spur / np.linalg.norm(Psi_spur, axis=0, keepdims=True) + PhiB = np.concatenate([Psi_sim[:, [0, 2, 1]], Psi_spur], axis=1) # reorder to make matching non-trivial + + matches, unA, unB, C_pad = hungarian_match( + fA, zA, PhiA, fB, zB, PhiB, + w_f=1.0, w_z=0.5, w_mac=1.0, + f_scale=max(fA.max(), fB.max()), + gate_df=0.25, gate_z=0.05, gate_mac=0.6, + cost_unassigned=1.2, # tune this: lower => more likely to leave spurious unmatched + big_M=1e6, + accept_cost_threshold=0.9 + ) + + print("Cost matrix (padded):\n", np.round(C_pad, 3)) + print("\nMatches (A_i -> B_j, cost):") + for i, j, c in matches: + print(f" A[{i}] -> B[{j}], cost={c:.3f}, fA={fA[i]:.2f}, fB={fB[j]:.2f}, zA={zA[i]:.3f}, zB={zB[j]:.3f}") + + print("\nUnmatched in A:", unA) + print("Unmatched in B:", unB) + +if __name__ == "__main__": + demo() \ No newline at end of file diff --git a/test/import_mlin_export_y.py b/test/import_mlin_export_y.py new file mode 100644 index 0000000..f3834b5 --- /dev/null +++ b/test/import_mlin_export_y.py @@ -0,0 +1,6 @@ +import skrf as rf +import numpy as np + +network = rf.Network("/tmp/paramer/simulation/8916/8916.s2p") + +network.write_touchstone("outputs/mlin_8916.z2p",parameter="Z") \ No newline at end of file diff --git a/test/logger.py b/test/logger.py new file mode 100644 index 0000000..6fa5805 --- /dev/null +++ b/test/logger.py @@ -0,0 +1,166 @@ +# metrics_logger.py (with colorlog console) +import json +import logging +import os +import re +from logging.handlers import RotatingFileHandler +from datetime import datetime, timezone + +# --- optional colorlog --- +try: + from colorlog import ColoredFormatter + HAVE_COLORLOG = True +except Exception: + HAVE_COLORLOG = False + + +# ---------- JSON 格式化器 ---------- +class JSONMetricsFormatter(logging.Formatter): + """将记录格式化为一行 JSON(JSONL)""" + def format(self, record: logging.LogRecord) -> str: + payload = { + "ts": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "event": getattr(record, "event", "metrics"), + "dataset": {"W": getattr(record, "W", None), "L": getattr(record, "L", None)}, + "rms": getattr(record, "rms", None), + "rms_l1": getattr(record, "rms_l1", None), + "rms_lmax": getattr(record, "rms_lmax", None), + } + if hasattr(record, "step") and record.step is not None: + payload["step"] = record.step + if hasattr(record, "tag") and record.tag: + payload["tag"] = record.tag + if record.msg and record.msg not in ("", None): + payload["msg"] = record.getMessage() + return json.dumps(payload, ensure_ascii=False) + + +def _build_human_message( + W: float, L: float, rms: float, rms_l1: float, rms_lmax: float, + step: int | None = None, tag: str | None = None +) -> str: + parts = [ + f"Dataset[W={W}, L={L}]", + f"RMS={rms:.12e}", + f"L1={rms_l1:.12e}", + f"Lmax={rms_lmax:.12e}", + ] + if step is not None: + parts.append(f"step={step}") + if tag: + parts.append(f"tag={tag}") + return " | ".join(parts) + + +# ---------- 日志器工厂 ---------- +def get_metrics_logger( + name: str = "metrics", + log_dir: str = "./logs", + json_filename: str = "metrics.jsonl", + level: int = logging.INFO, + max_bytes: int = 10 * 1024 * 1024, + backup_count: int = 3, + use_colorlog: bool = True, +) -> logging.Logger: + """ + 创建同时输出到控制台(可选彩色)和 JSONL 文件的日志器。 + - 控制台:彩色(colorlog 存在且启用)或普通 + - 文件:结构化 JSONL + """ + logger = logging.getLogger(name) + logger.setLevel(level) + logger.propagate = False + + if not logger.handlers: + os.makedirs(log_dir, exist_ok=True) + + # ---- 控制台 handler ---- + ch = logging.StreamHandler() + ch.setLevel(level) + if HAVE_COLORLOG and use_colorlog: + ch.setFormatter( + ColoredFormatter( + fmt="%(asctime)s | %(log_color)s%(levelname)-5s%(reset)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + log_colors={ + "DEBUG": "cyan", + "INFO": "green", + "WARNING": "yellow", + "ERROR": "red", + "CRITICAL": "bold_red", + }, + secondary_log_colors={}, # 需要给 message 局部再上色时可用 + style="%", + ) + ) + else: + ch.setFormatter(logging.Formatter( + fmt="%(asctime)s | %(levelname)-5s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + )) + logger.addHandler(ch) + + # ---- JSONL 文件 handler(带滚动)---- + fh = RotatingFileHandler( + filename=os.path.join(log_dir, json_filename), + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8", + ) + fh.setLevel(level) + fh.setFormatter(JSONMetricsFormatter()) + logger.addHandler(fh) + + return logger + + +# ---------- 便捷记录函数 ---------- +def log_metrics( + logger: logging.Logger, + W: float, + L: float, + rms: float, + rms_l1: float, + rms_lmax: float, + *, + step: int | None = None, + tag: str | None = None, + msg: str | None = None, +) -> None: + """ + 以结构化方式记录一次度量。 + - 终端(可彩色)显示:人类可读的一行 + - 文件:结构化 JSONL(携带全部字段) + """ + human_msg = msg or _build_human_message(W, L, rms, rms_l1, rms_lmax, step, tag) + extra = { + "event": "metrics", + "W": float(W), + "L": float(L), + "rms": float(rms), + "rms_l1": float(rms_l1), + "rms_lmax": float(rms_lmax), + "step": step, + "tag": tag, + } + logger.info(human_msg, extra=extra) + + +# ---------- 可选:解析旧日志行(你给的样例) ---------- +LEGACY_LINE_RE = re.compile( + r"Dataset:\s*\{[^}]*['\"]?W['\"]?\s*:\s*([0-9.+\-eE]+)\s*,\s*['\"]?L['\"]?\s*:\s*([0-9.+\-eE]+)\s*\}\s*,\s*" + r"RMS Error:\s*([0-9.+\-eE]+)\s*,\s*RMS Error L1:\s*([0-9.+\-eE]+)\s*,\s*RMS Error Lmax:\s*([0-9.+\-eE]+)" +) + +def parse_legacy_line(line: str): + m = LEGACY_LINE_RE.search(line) + if not m: + return None + W, L, rms, rms_l1, rms_lmax = map(float, m.groups()) + return {"W": W, "L": L, "rms": rms, "rms_l1": rms_l1, "rms_lmax": rms_lmax} + +def ingest_legacy_line(logger: logging.Logger, line: str, **kwargs): + parsed = parse_legacy_line(line) + if parsed: + log_metrics(logger, **parsed, **kwargs) diff --git a/test/sweep.py b/test/sweep.py new file mode 100644 index 0000000..cc3a481 --- /dev/null +++ b/test/sweep.py @@ -0,0 +1,5 @@ +from examples.capa import run_capa +from examples.mtee import run_mtee + +# run_capa() +run_mtee() \ No newline at end of file diff --git a/test/test_save_and_reload_model.py b/test/test_save_and_reload_model.py new file mode 100644 index 0000000..6e79ceb --- /dev/null +++ b/test/test_save_and_reload_model.py @@ -0,0 +1,48 @@ +import skrf as rf +import os +import json +from ovf.core.VFManager import VFManager +from ovf.core.sample import auto_select_multple_ports +from ovf.core.basis.MultiPortOrthonormalBasis import MultiPortOrthonormalBasis + +id = 3000 +network = rf.Network(f"/tmp/paramer/simulation/3000/3000.s2p") +ports = network.nports +K = 5 + +full_freqences = network.f +noised_sampled_points = network.y.reshape(-1,ports,ports) +sampled_points = network.y.reshape(-1,ports,ports) + +H,freqs = auto_select_multple_ports(noised_sampled_points,full_freqences,max_points=20) + +def run_capa(): + vf = VFManager(npoles_cplx=2,freqs=freqs,H=H,model=MultiPortOrthonormalBasis,iterations=K,verbose=False) + vf.fit() + vf.plot_metrics(show=False,save_path=f"outputs/{id}") + model_responses = vf.get_model_responses(full_freqences) + vf.plot_model_responses(show=False,save_path=f"outputs/{id}") + # vf.export(f"outputs/{id}") + vf.write(f"outputs/{id}") + +def load_model(): + vf = VFManager.load(f"outputs/3000") + + vf.plot_metrics(show=False,save_path=f"outputs/3001") + model_responses = vf.get_model_responses(full_freqences) + vf.plot_model_responses(show=False,save_path=f"outputs/3001") + # vf.export(f"outputs/{id}") + vf.write(f"outputs/3001") + +def load_model1(): + vf = VFManager.load(f"outputs/3001") + + vf.plot_metrics(show=False,save_path=f"outputs/3002") + model_responses = vf.get_model_responses(full_freqences) + vf.plot_model_responses(show=False,save_path=f"outputs/3002") + # vf.export(f"outputs/{id}") + vf.write(f"outputs/3002") + +run_capa() +load_model() +load_model1() \ No newline at end of file