From ec5e2aaddc683aaa57de2c30de9f9698f0fb3e67 Mon Sep 17 00:00:00 2001 From: mayge Date: Fri, 19 Sep 2025 04:38:05 -0400 Subject: [PATCH] feat: OVF formula 70 --- main.py | 336 +++++++++++++++++++++++++------------------------------- 1 file changed, 152 insertions(+), 184 deletions(-) diff --git a/main.py b/main.py index f7f8cd1..5e1b3c3 100644 --- a/main.py +++ b/main.py @@ -7,9 +7,10 @@ import sympy as sp from scipy.linalg import null_space from plotly.subplots import make_subplots import plotly.graph_objs +from core.freqency import auto_select -network = rf.Network("/tmp/paramer/simulation/4000/4000.s2p") +network = rf.Network("/tmp/paramer/simulation/3500/3500.s2p") freqs = network.f s = freqs * 2j * np.pi vf = rf.VectorFitting(network) @@ -116,7 +117,7 @@ def formula_67(s,y): N_t = basis @ n_target_vec # print("H = N_t / D_t",np.abs(N_t / D_t)) -class formula_70: +class formula_70_psi: """ VF-(70) with final pole-residue model. After fit(): @@ -128,20 +129,119 @@ class formula_70: # -------- internals -------- - def __init__(self): - self.cond = [] - self.rel = [] + def __init__(self, s, P, H, beta_min, beta_max ,alpha_scale=0.01, n_iter=20, d0=1.0, include_const=True, include_linear=False, verbose=True): + self.s = s + self.P = P + self.H = H + self.beta_min = beta_min + self.beta_max = beta_max + self.alpha_scale = alpha_scale + self.z0 = self._generate_starting_poles(P, beta_min=beta_min, beta_max=beta_max,alpha_scale=alpha_scale) + self.z0 = np.array(self.z0, dtype=np.complex128) + self.n_iter = n_iter + self.d0 = d0 + self.include_const = include_const + self.include_linear = include_linear + self.verbose = verbose + + self.rel_iteration = [] + self.cond_iteration = [] + self.conf_poles_recognize = [] + self.rms_error_iteration = [] + self.rms_error_poles_recongnize = [] + + @staticmethod + def _generate_starting_poles(P, beta_min:float, beta_max:float, alpha_scale=0.01): + """ + 仅生成复共轭对: p = -alpha + j beta, p*。 + n_pairs: 复对数量 (总极点数 = 2*n_pairs) + beta_min,beta_max: 想要覆盖的虚部范围 (单位: rad/s) + alpha_scale: alpha = alpha_scale * beta (文中 {α_p}=0.01{β_p}) + 返回: list[complex] (正虚部先, 后跟共轭) + """ + betas = 2*np.pi*np.linspace(beta_min, beta_max, P) + poles = [] + for b in betas: + alpha = alpha_scale * b + p = -alpha + 1j * b + poles += [p, np.conj(p)] + return poles + + def _generate_laguerre_basis(self, s: np.ndarray, z: np.ndarray): + poles = z + poles = sorted(poles, key=lambda p: np.real(p)) + basis = np.zeros((len(poles)+1,len(s)),dtype=complex) + + product = np.ones(len(s),dtype=complex) + + basis[0] = np.ones(len(s),dtype=complex) # φ_0 = 1 + i = 0 + while i < len(poles): + if np.real(poles[i]) >= 0: + raise ValueError(f"极点必须在左半平面: {poles[i]}") + + # 复对首 (正虚部) + if np.iscomplex(poles[i]) and np.imag(poles[i]) > 0: + if i + 1 >= len(poles): + raise ValueError("复极点缺少共轭") + pn = poles[i] + pc = poles[i + 1] + if not np.isclose(pc, np.conj(pn)): + pc, pn = pn,pc + if not np.isclose(pc, np.conj(pn)): + raise ValueError("复极点未按 (p, p*) 顺序排列 (正虚部在前)") + poles[i], poles[i+1] = pc, pn # swap + sigma = -np.real(pn) # >0 + scale = np.sqrt(2 * sigma) + r = np.abs(pn) + denom = (s - pn) * (s - pc) + + # 两个基函数 + phi_p = scale * (s - r) / denom * product + phi_pc = scale * (s + r) / denom * product + + # product 先乘 (s + p^*)/(s - p),再乘 (s + p)/(s - p^*) + product = product * (s + pc) / (s - pn) + product = product * (s + pn) / (s - pc) + + basis[i + 1] = phi_p + basis[i + 2] = phi_pc + i += 2 + continue + + # 复对次 (负虚部) —— 应该被首元素处理,出现表示顺序错误 + if np.iscomplex(pn) and np.imag(pn) < 0: + raise ValueError("检测到负虚部复极点但其共轭尚未处理,请将正虚部成员放在前面。") + + # 实极点 + sigma = -np.real(pn) + if sigma <= 0: + raise ValueError("实极点实部应为负 (稳定)。") + scale = np.sqrt(2 * sigma) + phi = scale / (s - pn) * product + # 更新乘积 + product = product * (s + pn) / (s - pn) + i += 1 + basis[i + 1] = phi + return basis + + + def _orthonormal_psi(self,s,z): + s = np.asarray(s, np.complex128).reshape(-1) + z = np.asarray(z, np.complex128).reshape(-1) + return self._generate_laguerre_basis(s,z).T + @staticmethod def _psi(s, z): s = np.asarray(s, np.complex128).reshape(-1) z = np.asarray(z, np.complex128).reshape(-1) return 1.0 / (s[:, None] + z[None, :]) - @staticmethod - def _lhp(z): - z = np.asarray(z, np.complex128).reshape(-1).copy() - z[z.real > 0] = -np.conj(z[z.real > 0]) - return z + # @staticmethod + # def _lhp(z): + # z = np.asarray(z, np.complex128).reshape(-1).copy() + # z[z.real > 0] = -np.conj(z[z.real > 0]) + # return z def _build_70(self, s, H_list, z_ref, d0): Hs = [np.asarray(h, np.complex128).reshape(-1) for h in H_list] @@ -159,18 +259,22 @@ class formula_70: rhs.append(-np.imag(d0 * H)) A = np.vstack(rows) b = np.concatenate(rhs) - return A, b, M, P + cond_poles_recognize = np.linalg.cond(A) + rms_error_poles_recongnize = np.sqrt(np.mean(np.abs(A @ np.zeros(A.shape[1]) - b)**2)) + return A, b, M, P, cond_poles_recognize, rms_error_poles_recongnize def _step_70(self, s, H_list, z_ref, d0=1.0, scale=True): - A, b, M, P = self._build_70(s, H_list, z_ref, d0) + A, b, M, P, cond_poles_recognize, rms_error_poles_recongnize = self._build_70(s, H_list, z_ref, d0) if scale: coln = np.maximum(np.linalg.norm(A, axis=0), 1e-12) x, *_ = np.linalg.lstsq(A / coln, b, rcond=None) x = x / coln - cond = np.linalg.cond(A) + cond_iteration = np.linalg.cond(A) + rms_error_iteration = np.sqrt(np.mean(np.abs(A @ x - b)**2)) else: x, *_ = np.linalg.lstsq(A, b, rcond=None) - cond = np.linalg.cond(A) + cond_iteration = np.linalg.cond(A) + rms_error_iteration = np.sqrt(np.mean(np.abs(A @ x - b)**2)) c = x[:P] res_ratio = np.empty((P, M), np.complex128) @@ -181,47 +285,51 @@ class formula_70: Sigma = np.diag(-np.asarray(z_ref, np.complex128)) T = Sigma - (np.ones((P, 1), np.complex128) @ (c.reshape(1, -1) / d0)) z_new = -np.linalg.eigvals(T) - z_new = self._lhp(z_new) + # z_new = self._lhp(z_new) # cond = np.linalg.cond(T) - return z_new, c, cond, res_ratio + return z_new, c, cond_iteration, rms_error_iteration, cond_poles_recognize, rms_error_poles_recongnize, res_ratio # -------- public API -------- - def fit(self, s, H, z0, n_iter=20, d0=1.0, include_const=True, include_linear=False, verbose=True): + def fit(self): """ s : (K,) complex samples (j*2π*f_sel) H : (K,) complex or (K,M) or list of M vectors z0: initial poles (P,) complex (LHP + conjugate pairs recommended) """ # normalize responses -> list - if isinstance(H, (list, tuple)): - H_list = [np.asarray(h, np.complex128).reshape(-1) for h in H] + if isinstance(self.H, (list, tuple)): + H_list = [np.asarray(h, np.complex128).reshape(-1) for h in self.H] else: - H_arr = np.asarray(H, np.complex128) + H_arr = np.asarray(self.H, np.complex128) if H_arr.ndim == 1: H_list = [H_arr] - elif H_arr.ndim == 2 and H_arr.shape[0] == len(s): + elif H_arr.ndim == 2 and H_arr.shape[0] == len(self.s): H_list = [H_arr[:, i].copy() for i in range(H_arr.shape[1])] else: raise ValueError("H must be (K,), list of (K,), or (K,M) with M responses.") M = len(H_list) - z = self._lhp(np.asarray(z0, np.complex128)) + # z = self._lhp(np.asarray(self.z0, np.complex128)) + z = self.z0 # SK/VF relocations - for it in range(n_iter): - z_next, c_last, cond, _ = self._step_70(s, H_list, z, d0=d0, scale=True) + for it in range(self.n_iter): + z_next, c_last, cond_iteration, rms_error_iteration, cond_poles_recognize, rms_error_poles_recongnize, _ = self._step_70(self.s, H_list, z, d0=self.d0, scale=True) rel = np.linalg.norm(z_next) / max(1.0, np.linalg.norm(z)) - if verbose: - print(f"[VF-70] iter {it+1:02d}/{n_iter:02d} Δz_rel={rel} cond(z)={cond}") - self.cond.append(cond) - self.rel.append(rel) + if self.verbose: + print(f"[VF-70] iter {it+1:02d}/{self.n_iter:02d} Δz_rel={rel}") + self.cond_iteration.append(cond_iteration) + self.rms_error_iteration.append(rms_error_iteration) + self.conf_poles_recognize.append(cond_poles_recognize) + self.rms_error_poles_recongnize.append(rms_error_poles_recongnize) + self.rel_iteration.append(rel) z = z_next # ---- Finalize: refit residues with fixed poles z (pole–residue model) ---- - Phi = self._psi(s, z) # K x P + Phi = self._psi(self.s, z) # K x P # Build design matrix for extras extras = [] - if include_const: extras.append(np.ones(len(s), np.complex128)) - if include_linear: extras.append(s.astype(np.complex128)) + if self.include_const: extras.append(np.ones(len(self.s), np.complex128)) + if self.include_linear: extras.append(self.s.astype(np.complex128)) if extras: X_base = np.column_stack([Phi] + extras) # K x (P + E) else: @@ -276,12 +384,18 @@ class formula_70: # # plt.show() # plt.savefig(f"img_rel_cond.png") def plot_rel_and_cond(self): - fig = make_subplots(rows=2, cols=1, subplot_titles=('Relative Change', 'Condition Number')) - fig.add_trace(plotly.graph_objs.Scatter(y=self.rel, mode='lines+markers', name='rel'), row=1, col=1) - fig.add_trace(plotly.graph_objs.Scatter(y=self.cond, mode='lines+markers', name='cond'), row=2, col=1) + fig = make_subplots(rows=3, cols=2) + fig.add_trace(plotly.graph_objs.Scatter(y=self.rel_iteration, mode='lines+markers', name='rel'), row=1, col=1) + fig.add_trace(plotly.graph_objs.Scatter(y=self.cond_iteration, mode='lines+markers', name='cond_iteration'), row=2, col=1) + fig.add_trace(plotly.graph_objs.Scatter(y=self.rms_error_iteration, mode='lines+markers', name='rms_error_iteration'), row=3, col=1) + fig.add_trace(plotly.graph_objs.Scatter(y=self.conf_poles_recognize, mode='lines+markers', name='cond_poles_recognize'), row=2, col=2) + fig.add_trace(plotly.graph_objs.Scatter(y=self.rms_error_poles_recongnize, mode='lines+markers', name='rms_error_poles_recognize'), row=3, col=2) fig.update_xaxes(title_text='Iteration', row=1, col=1) fig.update_yaxes(title_text='Relative Change', row=1, col=1) - fig.update_yaxes(title_text='Condition Number', row=2, col=1) + fig.update_yaxes(title_text='Condition Number (Iteration)', row=2, col=1) + fig.update_yaxes(title_text='RMS Error (Iteration)', row=3, col=1) + fig.update_yaxes(title_text='Condition Number (Pole Recognition)', row=2, col=2) + fig.update_yaxes(title_text='RMS Error (Pole Recognition)', row=3, col=2) fig.update_layout(height=800, width=800, title_text='Relative Change and Condition Number per Iteration') fig.write_image("img_rel_cond.png") # fig.show() @@ -289,164 +403,18 @@ class formula_70: def evaluate_on_freq(self, freq_eval, m=None): return self.evaluate(1j * 2*np.pi * np.asarray(freq_eval, float), m=m) - # Optional: save/load the final rational model - def save(self, path): - if not hasattr(self, "poles"): - raise RuntimeError("Nothing to save; call fit(...) first.") - np.savez(path, poles=self.poles, res=self.res, h=self.h, g=self.g) - - @classmethod - def load(cls, path): - d = np.load(path, allow_pickle=False) - obj = cls() - obj.poles = d["poles"]; obj.res = d["res"]; obj.h = d["h"]; obj.g = d["g"] - return obj -def auto_select(H, freq, - n_baseline=64, # log-spaced backbone points - peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection - peak_window=5, # take ±peak_window samples around each peak - topgrad_q=0.98, # keep top 2% largest slope/phase-change points - max_points=25, # final cap on selected samples (None = no cap) - ensure_ends=True): - """ - Select several significant sample points for vector fitting. - - Strategy: - 1) Always keep endpoints (optional). - 2) Add a log-spaced baseline over the band. - 3) Detect resonance peaks in |H| (on a log scale) and keep small windows around them. - 4) Add points with the largest magnitude slope and phase-change (w.r.t log-f). - 5) De-duplicate, sort, and optionally thin to 'max_points' with priority - to endpoints and detected peaks. - - Parameters - ---------- - H : (N,) complex array - Frequency response samples. - freq : (N,) float array - Frequency axis [Hz], strictly increasing. - n_baseline : int - Count of log-spaced baseline samples across the band. - peak_prominence : float - Peak prominence threshold as a fraction of the dynamic range in log|H|. - 0.05 ≈ keep peaks ≥ 5% of the range. - peak_window : int - Number of neighbor indices to include on each side of every detected peak. - topgrad_q : float in (0,1) - Quantile for selecting strong slope/phase points. - 0.98 ⇒ keep the top 2% largest derivatives. - max_points : int or None - If not None, cap the total number of selected indices to this value. - ensure_ends : bool - Always include the first and last samples. - - Returns - ------- - H_sel : (K,) complex array - freq_sel : (K,) float array - """ - H = np.asarray(H).reshape(-1) - f = np.asarray(freq).reshape(-1) - if H.size != f.size: - raise ValueError("H and freq must have the same length.") - N = f.size - if N < 4: - return H.copy(), f.copy() - - eps = 1e-16 - mag = np.abs(H) - logmag = np.log10(mag + eps) - phase = np.unwrap(np.angle(H)) - - # log-frequency axis (scale-invariant derivatives) - # keep it linear if any non-positive freq sneaks in - if np.all(f > 0): - lf = np.log(f) - else: - lf = f.copy() - - dlf = np.gradient(lf) - d_logmag = np.gradient(logmag) / (dlf + 1e-16) - d_phase = np.gradient(phase) / (dlf + 1e-16) - - idx = set() - if ensure_ends: - idx.update([0, N-1]) - - # 1) log-spaced baseline - if n_baseline > 0: - # map a log grid to nearest indices - grid = np.linspace(lf.min(), lf.max(), n_baseline) - base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1) - idx.update(np.unique(base_idx).tolist()) - - # 2) peaks in |H| - try: - from scipy.signal import find_peaks - dyn = logmag.max() - logmag.min() - prom = peak_prominence * (dyn + 1e-12) - peaks, _ = find_peaks(logmag, prominence=prom) - except Exception: - # simple fallback: strict local maxima - peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1 - - for p in peaks: - lo = max(0, p - peak_window) - hi = min(N, p + peak_window + 1) - idx.update(range(lo, hi)) - - # 3) strongest slope / phase-change points - thr_slope = np.quantile(np.abs(d_logmag), topgrad_q) - thr_phase = np.quantile(np.abs(d_phase), topgrad_q) - idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist()) - idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist()) - - # 4) finalize set - sel = np.array(sorted(idx), dtype=int) - - # 5) optional thinning with priority to endpoints and peaks - if max_points is not None and sel.size > max_points: - priority = np.zeros(sel.size, dtype=int) - if ensure_ends: - priority[(sel == 0) | (sel == N-1)] = 3 - if peaks.size: - priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2) - - keep = [] - budget = max_points - # keep highest-priority first - for lev in (3, 2, 1, 0): - cand = sel[priority == lev] - if cand.size == 0: - continue - if cand.size <= budget: - keep.extend(cand.tolist()) - budget -= cand.size - else: - step = max(1, int(np.ceil(cand.size / budget))) - keep.extend(cand[::step][:budget].tolist()) - budget = 0 - if budget == 0: - break - sel = np.array(sorted(set(keep)), dtype=int) - - return H[sel], f[sel] - if __name__ == "__main__": # formula_67(s,y) H11 = np.array([y[i,0,0] for i in range(len(y))]) H11_slice,freqs_slice = auto_select(H11,freqs,max_points=20) s_slice = freqs_slice * 2j * np.pi - P_pairs = 1 - z0 = generate_starting_poles(P_pairs, beta_min=1e8, beta_max=freqs_slice[-1]) - z0 = np.array(z0, dtype=np.complex128) + P_pairs = 2 - f70 = formula_70() K = 10 - model = f70.fit(s_slice, H11_slice, z0, n_iter=K, d0=1.0, verbose=True) - model.save("vf70_model.npz") + f70 = formula_70_psi(s_slice,P_pairs, H11_slice, beta_min=1e8, beta_max=freqs_slice[-1],alpha_scale=0.01, n_iter=K, d0=1.0, verbose=True) + model = f70.fit() model.plot_rel_and_cond() Hfit_dense = model.evaluate_on_freq(freqs)