feat: 添加测试代码

This commit is contained in:
mayge
2025-10-02 04:35:41 -04:00
parent 15d1c042fe
commit f02cfbeec0
8 changed files with 532 additions and 1 deletions

1
.gitignore vendored
View File

@@ -7,7 +7,6 @@ env/
.venv/
.vscode/
.cache/
test/
outputs/
ovf.egg-info/
dist/

7
test/gvf_plot_poles.py Normal file
View File

@@ -0,0 +1,7 @@
from ovf.core.GVFManager import GVFManager
gvf = GVFManager.load("outputs/mtee_gvf.pkl")
# gvf = GVFManager()
# gvf.load_from_datasets("examples/data/capa.json",npoles_cplx=2,max_points=20)
# gvf.save("outputs/capa_gvf.pkl")
gvf.plot_poles("outputs/mtee_poles",degree=3,geometry_1="L1",geometry_2="L2")

View File

@@ -0,0 +1,97 @@
from ovf.core.GVFManager import GVFManager
import numpy as np
import logging
def get_logger():
logger = logging.getLogger("gvf_worst_rms_error")
logger.setLevel(logging.INFO)
if not logger.hasHandlers():
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
logger = get_logger()
gvf = GVFManager()
gvf.load_from_datasets("examples/data/mlin.json",npoles_cplx=2,max_points=20,parameter_type="s",min_freqs=10e9,max_freqs=200e9)
gvf.save("outputs/mlin_gvf_2pole_s.pkl")
# gvf = GVFManager.load("outputs/mlin_gvf_2pole_s.pkl")
max_rms_error_info = {
"max_rms_error": 0.0,
"max_rms_error_l1": 0.0,
"max_rms_error_lmax": 0.0,
"max_rms_error_geom": None,
"max_rms_index": 0
}
max_rms_error_l1_info = {
"max_rms_error": 0.0,
"max_rms_error_l1": 0.0,
"max_rms_error_lmax": 0.0,
"max_rms_error_geom": None,
"max_rms_index": 0
}
max_rms_error_lmax_info = {
"max_rms_error": 0.0,
"max_rms_error_l1": 0.0,
"max_rms_error_lmax": 0.0,
"max_rms_error_geom": None,
"max_rms_index": 0
}
for ds in gvf.datasets:
rmse = ds.vf_manager.eigenval_rms_error[-1]
rmse_l1 = ds.vf_manager.eigenval_rms_error_l1[-1]
rmse_lmax = ds.vf_manager.eigenval_rms_error_lmax[-1]
# logger.info(f"Dataset: {ds.geometries}, RMS Error: {rmse}, RMS Error L1: {rmse_l1}, RMS Error Lmax: {rmse_lmax}")
logger.info(
f'W={ds.geometries["W"]},L={ds.geometries["L"]},rms={rmse},rms_l1={rmse_l1},rms_lmax={rmse_lmax},id={ds.id}',
)
if rmse > max_rms_error_info["max_rms_error"]:
max_rms_error_info["max_rms_error"] = rmse
max_rms_error_info["max_rms_error_l1"] = rmse_l1
max_rms_error_info["max_rms_error_lmax"] = rmse_lmax
max_rms_error_info["max_rms_error_geom"] = ds.geometries
max_rms_error_info["max_rms_index"] = ds.id
if rmse_l1 > max_rms_error_l1_info["max_rms_error_l1"]:
max_rms_error_l1_info["max_rms_error"] = rmse
max_rms_error_l1_info["max_rms_error_l1"] = rmse_l1
max_rms_error_l1_info["max_rms_error_lmax"] = rmse_lmax
max_rms_error_l1_info["max_rms_error_geom"] = ds.geometries
max_rms_error_l1_info["max_rms_index"] = ds.id
if rmse_lmax > max_rms_error_lmax_info["max_rms_error_lmax"]:
max_rms_error_lmax_info["max_rms_error"] = rmse
max_rms_error_lmax_info["max_rms_error_l1"] = rmse_l1
max_rms_error_lmax_info["max_rms_error_lmax"] = rmse_lmax
max_rms_error_lmax_info["max_rms_error_geom"] = ds.geometries
max_rms_error_lmax_info["max_rms_index"] = ds.id
print(f"Maximum RMS error across datasets: {max_rms_error_info}")
print(f"Geometry with maximum RMS error: {max_rms_error_info['max_rms_error_geom']}")
print(f"Index of dataset with maximum RMS error: {max_rms_error_info['max_rms_index']}")
print(f"Maximum RMS L1 error across datasets: {max_rms_error_l1_info}")
print(f"Geometry with maximum RMS L1 error: {max_rms_error_l1_info['max_rms_error_geom']}")
print(f"Index of dataset with maximum RMS L1 error: {max_rms_error_l1_info['max_rms_index']}")
print(f"Maximum RMS Lmax error across datasets: {max_rms_error_lmax_info}")
print(f"Geometry with maximum RMS Lmax error: {max_rms_error_lmax_info['max_rms_error_geom']}")
print(f"Index of dataset with maximum RMS Lmax error: {max_rms_error_lmax_info['max_rms_index']}")
gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses",int(max_rms_error_info['max_rms_index']))
gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses_l1",int(max_rms_error_l1_info['max_rms_index']))
gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses_lmax",int(max_rms_error_lmax_info['max_rms_index']))
gvf.plot_poles_in_2d("outputs/mlin_gvf_2pole_s_poles.html")
gvf.plot_vf_responses_with_index("outputs/mlin_gvf_2pole_s_responses",1399)

203
test/hungarian_match.py Normal file
View File

@@ -0,0 +1,203 @@
import numpy as np
from scipy.optimize import linear_sum_assignment
def pairwise_mac(Phi: np.ndarray, Psi: np.ndarray) -> np.ndarray:
"""
Pairwise MAC between mode shape sets Phi (d x m) and Psi (d x n).
Columns are mode shapes. Works for real or complex.
Returns an (m x n) array of MAC values in [0,1].
"""
# Normalize columns
Phi_n = Phi / np.linalg.norm(Phi, axis=0, keepdims=True)
Psi_n = Psi / np.linalg.norm(Psi, axis=0, keepdims=True)
G = Phi_n.conj().T @ Psi_n
return np.abs(G) ** 2
def eig_to_freq_damp_from_s(lam: np.ndarray):
"""
Continuous-time eigenvalues s = sigma + j*omega -> (f, zeta).
f in Hz, zeta in [0,1].
"""
sigma = lam.real
omega = np.abs(lam.imag)
f = omega / (2 * np.pi)
zeta = -sigma / np.sqrt(sigma**2 + omega**2 + 1e-18)
return f, zeta
def z_to_s(z: np.ndarray, dt: float) -> np.ndarray:
""" Map discrete-time eigenvalues z to continuous-time s via log(z)/dt. """
return np.log(z) / dt
def build_cost_matrix(
f1, z1, Phi1, f2, z2, Phi2,
w_f=1.0, w_z=0.5, w_mac=1.0, f_scale=None,
gate_df=0.20, gate_z=0.05, gate_mac=0.7,
cost_unassigned=2.0, big_M=1e6
):
"""
Construct a padded square cost matrix for Hungarian assignment with gating and dummies.
Parameters:
- f1, z1: (m,) arrays for set A (reference)
- Phi1: (d, m) mode shapes for set A
- f2, z2: (n,) arrays for set B (to match)
- Phi2: (d, n) mode shapes for set B
- w_f, w_z, w_mac: weights
- f_scale: normalization for frequency difference (default: max of both sets or 1.0)
- gate_df: max allowed normalized freq diff (Hz normalized) for feasible match
- gate_z: max allowed damping diff for feasible match
- gate_mac: min required MAC for feasible match
- cost_unassigned: penalty to leave a mode unmatched (tune to your problem)
- big_M: prohibitive cost for infeasible pairs
Returns:
C_pad: (L, L) padded square cost matrix
sizes: dict with m, n, L
"""
f1 = np.asarray(f1).reshape(-1)
z1 = np.asarray(z1).reshape(-1)
f2 = np.asarray(f2).reshape(-1)
z2 = np.asarray(z2).reshape(-1)
m = f1.size
n = f2.size
if f_scale is None:
f_scale = max(np.max(f1) if m else 1.0, np.max(f2) if n else 1.0, 1.0)
# Pairwise components
DF = np.abs(f1[:, None] - f2[None, :]) / f_scale # (m,n)
DZ = np.abs(z1[:, None] - z2[None, :]) # (m,n)
MAC = pairwise_mac(Phi1, Phi2) if (Phi1.size and Phi2.size) else np.zeros((m, n))
one_minus_MAC = 1.0 - MAC
# Base cost
C = w_f * DF + w_z * DZ + w_mac * one_minus_MAC
# Gating
infeasible = (DF > gate_df) | (DZ > gate_z) | (MAC < gate_mac)
C = np.where(infeasible, big_M, C)
# Pad to square with dummy rows/cols so that unmatched is allowed
L = max(m, n)
if L == 0:
return np.zeros((0, 0)), {"m": m, "n": n, "L": L}
C_pad = np.full((L, L), cost_unassigned, dtype=float)
# Place real costs into top-left block
if m and n:
C_pad[:m, :n] = C
# For the padded parts (dummies), keep cost_unassigned (meaning: matching to dummy costs that penalty)
return C_pad, {"m": m, "n": n, "L": L}
def hungarian_match(
f1, z1, Phi1, f2, z2, Phi2,
w_f=1.0, w_z=0.5, w_mac=1.0, f_scale=None,
gate_df=0.20, gate_z=0.05, gate_mac=0.7,
cost_unassigned=2.0, big_M=1e6,
accept_cost_threshold=None
):
"""
Run Hungarian assignment and return matches and unmatched lists.
Returns:
matches: list of (i, j, cost) with i in [0,m) and j in [0,n)
unmatched_A: list of indices in A not matched to any real B (or rejected by accept threshold)
unmatched_B: list of indices in B not matched to any real A (or rejected by accept threshold)
C_pad: the square padded cost matrix actually optimized
"""
C_pad, sizes = build_cost_matrix(
f1, z1, Phi1, f2, z2, Phi2,
w_f, w_z, w_mac, f_scale,
gate_df, gate_z, gate_mac,
cost_unassigned, big_M
)
m, n, L = sizes["m"], sizes["n"], sizes["L"]
if L == 0:
return [], [], [], C_pad
row_ind, col_ind = linear_sum_assignment(C_pad)
matches = []
unmatched_A = []
unmatched_B = []
# Interpret the assignment on the unpadded indices
for r, c in zip(row_ind, col_ind):
# Case 1: real-to-real
if r < m and c < n:
cost = C_pad[r, c]
if accept_cost_threshold is not None and cost > accept_cost_threshold:
unmatched_A.append(r)
unmatched_B.append(c)
elif cost >= big_M:
# Infeasible pair forced by padding; treat as unmatched
unmatched_A.append(r)
unmatched_B.append(c)
else:
matches.append((r, c, float(cost)))
# Case 2: A matched to a dummy column => A unmatched
elif r < m and c >= n:
unmatched_A.append(r)
# Case 3: dummy row matched to real B => B unmatched
elif r >= m and c < n:
unmatched_B.append(c)
# Case 4: dummy-dummy (should not affect)
else:
pass
# Deduplicate unmatched indices in case of multiple reasons
unmatched_A = sorted(set([i for i in unmatched_A if 0 <= i < m]))
unmatched_B = sorted(set([j for j in unmatched_B if 0 <= j < n]))
return matches, unmatched_A, unmatched_B, C_pad
def demo():
# Synthetic example: 3 reference modes vs 4 estimated modes
rng = np.random.default_rng(42)
d = 3 # DOF of mode shapes
# Reference modes (A)
fA = np.array([1.60, 3.00, 7.50]) # Hz
zA = np.array([0.010, 0.020, 0.005]) # damping ratios
PhiA = np.array([
[1.0, 0.1, 0.2],
[0.2, 1.0, 0.1],
[0.1, 0.2, 1.0],
], dtype=float)
PhiA = PhiA / np.linalg.norm(PhiA, axis=0, keepdims=True)
# Estimated modes (B): 3 near matches + 1 spurious
fB = np.array([1.62, 7.48, 2.95, 10.0])
zB = np.array([0.011, 0.006, 0.022, 0.010])
# Perturb PhiA columns to create similar shapes (plus one unrelated)
Psi_sim = PhiA.copy()
Psi_sim += 0.03 * rng.standard_normal(Psi_sim.shape)
Psi_sim = Psi_sim / np.linalg.norm(Psi_sim, axis=0, keepdims=True)
Psi_spur = rng.standard_normal((d, 1))
Psi_spur = Psi_spur / np.linalg.norm(Psi_spur, axis=0, keepdims=True)
PhiB = np.concatenate([Psi_sim[:, [0, 2, 1]], Psi_spur], axis=1) # reorder to make matching non-trivial
matches, unA, unB, C_pad = hungarian_match(
fA, zA, PhiA, fB, zB, PhiB,
w_f=1.0, w_z=0.5, w_mac=1.0,
f_scale=max(fA.max(), fB.max()),
gate_df=0.25, gate_z=0.05, gate_mac=0.6,
cost_unassigned=1.2, # tune this: lower => more likely to leave spurious unmatched
big_M=1e6,
accept_cost_threshold=0.9
)
print("Cost matrix (padded):\n", np.round(C_pad, 3))
print("\nMatches (A_i -> B_j, cost):")
for i, j, c in matches:
print(f" A[{i}] -> B[{j}], cost={c:.3f}, fA={fA[i]:.2f}, fB={fB[j]:.2f}, zA={zA[i]:.3f}, zB={zB[j]:.3f}")
print("\nUnmatched in A:", unA)
print("Unmatched in B:", unB)
if __name__ == "__main__":
demo()

View File

@@ -0,0 +1,6 @@
import skrf as rf
import numpy as np
network = rf.Network("/tmp/paramer/simulation/8916/8916.s2p")
network.write_touchstone("outputs/mlin_8916.z2p",parameter="Z")

166
test/logger.py Normal file
View File

@@ -0,0 +1,166 @@
# metrics_logger.py (with colorlog console)
import json
import logging
import os
import re
from logging.handlers import RotatingFileHandler
from datetime import datetime, timezone
# --- optional colorlog ---
try:
from colorlog import ColoredFormatter
HAVE_COLORLOG = True
except Exception:
HAVE_COLORLOG = False
# ---------- JSON 格式化器 ----------
class JSONMetricsFormatter(logging.Formatter):
"""将记录格式化为一行 JSONJSONL"""
def format(self, record: logging.LogRecord) -> str:
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"event": getattr(record, "event", "metrics"),
"dataset": {"W": getattr(record, "W", None), "L": getattr(record, "L", None)},
"rms": getattr(record, "rms", None),
"rms_l1": getattr(record, "rms_l1", None),
"rms_lmax": getattr(record, "rms_lmax", None),
}
if hasattr(record, "step") and record.step is not None:
payload["step"] = record.step
if hasattr(record, "tag") and record.tag:
payload["tag"] = record.tag
if record.msg and record.msg not in ("", None):
payload["msg"] = record.getMessage()
return json.dumps(payload, ensure_ascii=False)
def _build_human_message(
W: float, L: float, rms: float, rms_l1: float, rms_lmax: float,
step: int | None = None, tag: str | None = None
) -> str:
parts = [
f"Dataset[W={W}, L={L}]",
f"RMS={rms:.12e}",
f"L1={rms_l1:.12e}",
f"Lmax={rms_lmax:.12e}",
]
if step is not None:
parts.append(f"step={step}")
if tag:
parts.append(f"tag={tag}")
return " | ".join(parts)
# ---------- 日志器工厂 ----------
def get_metrics_logger(
name: str = "metrics",
log_dir: str = "./logs",
json_filename: str = "metrics.jsonl",
level: int = logging.INFO,
max_bytes: int = 10 * 1024 * 1024,
backup_count: int = 3,
use_colorlog: bool = True,
) -> logging.Logger:
"""
创建同时输出到控制台(可选彩色)和 JSONL 文件的日志器。
- 控制台彩色colorlog 存在且启用)或普通
- 文件:结构化 JSONL
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.propagate = False
if not logger.handlers:
os.makedirs(log_dir, exist_ok=True)
# ---- 控制台 handler ----
ch = logging.StreamHandler()
ch.setLevel(level)
if HAVE_COLORLOG and use_colorlog:
ch.setFormatter(
ColoredFormatter(
fmt="%(asctime)s | %(log_color)s%(levelname)-5s%(reset)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "bold_red",
},
secondary_log_colors={}, # 需要给 message 局部再上色时可用
style="%",
)
)
else:
ch.setFormatter(logging.Formatter(
fmt="%(asctime)s | %(levelname)-5s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
logger.addHandler(ch)
# ---- JSONL 文件 handler带滚动----
fh = RotatingFileHandler(
filename=os.path.join(log_dir, json_filename),
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
fh.setLevel(level)
fh.setFormatter(JSONMetricsFormatter())
logger.addHandler(fh)
return logger
# ---------- 便捷记录函数 ----------
def log_metrics(
logger: logging.Logger,
W: float,
L: float,
rms: float,
rms_l1: float,
rms_lmax: float,
*,
step: int | None = None,
tag: str | None = None,
msg: str | None = None,
) -> None:
"""
以结构化方式记录一次度量。
- 终端(可彩色)显示:人类可读的一行
- 文件:结构化 JSONL携带全部字段
"""
human_msg = msg or _build_human_message(W, L, rms, rms_l1, rms_lmax, step, tag)
extra = {
"event": "metrics",
"W": float(W),
"L": float(L),
"rms": float(rms),
"rms_l1": float(rms_l1),
"rms_lmax": float(rms_lmax),
"step": step,
"tag": tag,
}
logger.info(human_msg, extra=extra)
# ---------- 可选:解析旧日志行(你给的样例) ----------
LEGACY_LINE_RE = re.compile(
r"Dataset:\s*\{[^}]*['\"]?W['\"]?\s*:\s*([0-9.+\-eE]+)\s*,\s*['\"]?L['\"]?\s*:\s*([0-9.+\-eE]+)\s*\}\s*,\s*"
r"RMS Error:\s*([0-9.+\-eE]+)\s*,\s*RMS Error L1:\s*([0-9.+\-eE]+)\s*,\s*RMS Error Lmax:\s*([0-9.+\-eE]+)"
)
def parse_legacy_line(line: str):
m = LEGACY_LINE_RE.search(line)
if not m:
return None
W, L, rms, rms_l1, rms_lmax = map(float, m.groups())
return {"W": W, "L": L, "rms": rms, "rms_l1": rms_l1, "rms_lmax": rms_lmax}
def ingest_legacy_line(logger: logging.Logger, line: str, **kwargs):
parsed = parse_legacy_line(line)
if parsed:
log_metrics(logger, **parsed, **kwargs)

5
test/sweep.py Normal file
View File

@@ -0,0 +1,5 @@
from examples.capa import run_capa
from examples.mtee import run_mtee
# run_capa()
run_mtee()

View File

@@ -0,0 +1,48 @@
import skrf as rf
import os
import json
from ovf.core.VFManager import VFManager
from ovf.core.sample import auto_select_multple_ports
from ovf.core.basis.MultiPortOrthonormalBasis import MultiPortOrthonormalBasis
id = 3000
network = rf.Network(f"/tmp/paramer/simulation/3000/3000.s2p")
ports = network.nports
K = 5
full_freqences = network.f
noised_sampled_points = network.y.reshape(-1,ports,ports)
sampled_points = network.y.reshape(-1,ports,ports)
H,freqs = auto_select_multple_ports(noised_sampled_points,full_freqences,max_points=20)
def run_capa():
vf = VFManager(npoles_cplx=2,freqs=freqs,H=H,model=MultiPortOrthonormalBasis,iterations=K,verbose=False)
vf.fit()
vf.plot_metrics(show=False,save_path=f"outputs/{id}")
model_responses = vf.get_model_responses(full_freqences)
vf.plot_model_responses(show=False,save_path=f"outputs/{id}")
# vf.export(f"outputs/{id}")
vf.write(f"outputs/{id}")
def load_model():
vf = VFManager.load(f"outputs/3000")
vf.plot_metrics(show=False,save_path=f"outputs/3001")
model_responses = vf.get_model_responses(full_freqences)
vf.plot_model_responses(show=False,save_path=f"outputs/3001")
# vf.export(f"outputs/{id}")
vf.write(f"outputs/3001")
def load_model1():
vf = VFManager.load(f"outputs/3001")
vf.plot_metrics(show=False,save_path=f"outputs/3002")
model_responses = vf.get_model_responses(full_freqences)
vf.plot_model_responses(show=False,save_path=f"outputs/3002")
# vf.export(f"outputs/{id}")
vf.write(f"outputs/3002")
run_capa()
load_model()
load_model1()