From 995171d9666feab3eb78c252ee48a7ca65943739 Mon Sep 17 00:00:00 2001 From: mayge Date: Thu, 18 Sep 2025 06:44:34 -0400 Subject: [PATCH] feat: plot condition number --- main.py | 380 +++++++++++++++++++++++++------------------------------- 1 file changed, 172 insertions(+), 208 deletions(-) diff --git a/main.py b/main.py index 9bdb8e0..f7f8cd1 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,8 @@ import skrf as rf import matplotlib.pyplot as plt import sympy as sp from scipy.linalg import null_space +from plotly.subplots import make_subplots +import plotly.graph_objs network = rf.Network("/tmp/paramer/simulation/4000/4000.s2p") @@ -115,223 +117,190 @@ def formula_67(s,y): # print("H = N_t / D_t",np.abs(N_t / D_t)) class formula_70: + """ + VF-(70) with final pole-residue model. + After fit(): + self.poles : (P,) complex + self.res : (P, M) complex # residues per response (columns) + self.h : (M,) complex # optional constants + self.g : (M,) complex # optional proportional terms + """ - # ---------- helpers ---------- - def _psi(self,s, z_prev): - """Psi[k,p] = 1/(s_k + z_prev[p]) (shape: Kf x P)""" - s = np.asarray(s, dtype=np.complex128).reshape(-1) - z_prev = np.asarray(z_prev, dtype=np.complex128).reshape(-1) - return 1.0 / (s[:, None] + z_prev[None, :]) + # -------- internals -------- - def _enforce_lhp_conjugate(self, poles): - """Reflect any RHP poles across the imag axis and pair conjugates loosely.""" - z = np.array(poles, dtype=np.complex128) - for i in range(len(z)): - if z[i].real > 0: - z[i] = -np.conj(z[i]) + def __init__(self): + self.cond = [] + self.rel = [] + @staticmethod + def _psi(s, z): + s = np.asarray(s, np.complex128).reshape(-1) + z = np.asarray(z, np.complex128).reshape(-1) + return 1.0 / (s[:, None] + z[None, :]) + + @staticmethod + def _lhp(z): + z = np.asarray(z, np.complex128).reshape(-1).copy() + z[z.real > 0] = -np.conj(z[z.real > 0]) return z - # ---------- build Ax=b for (70) ----------1 - def build_vf70_system(self,s, H_list, z_prev, d0=1.0): - """ - s: (Kf,) complex sample points (e.g. j*2πf) - H_list: list of (Kf,) complex responses, one per MIMO element you want to fit - z_prev: (P,) complex poles from previous iteration - d0: fixed scalar, usually 1.0 - - Returns: A (2*Kf*M, (M+1)*P), b (2*Kf*M,), plus Psi for convenience - Unknown vector x = [c (P), r^(1) (P), ..., r^(M) (P)] - """ - H_list = [np.asarray(h, dtype=np.complex128).reshape(-1) for h in H_list] - Kf = len(s) - P = len(z_prev) - M = len(H_list) - - Psi = self._psi(s, z_prev) # Kf x P - zerosP = np.zeros((Kf, P)) # convenience - - rows = [] - rhs = [] - - for m, H in enumerate(H_list): - Hp = H[:, None] # Kf x 1 - - # Left block (shared c): H * Psi - L_re = np.real(Hp * Psi) # Kf x P - L_im = np.imag(Hp * Psi) # Kf x P - - # Right block for this response: -Psi - R_re = -np.real(Psi) # Kf x P - R_im = -np.imag(Psi) # Kf x P - - # Assemble columns: [c | r^(1) | r^(2) | ... | r^(M)] - # Real rows - cols_re = [L_re] - for j in range(M): - cols_re.append(R_re if j == m else zerosP) - rows.append(np.hstack(cols_re)) - - # Imag rows - cols_im = [L_im] - for j in range(M): - cols_im.append(R_im if j == m else zerosP) - rows.append(np.hstack(cols_im)) - - # RHS (move d0*H to the right) + def _build_70(self, s, H_list, z_ref, d0): + Hs = [np.asarray(h, np.complex128).reshape(-1) for h in H_list] + K, P, M = len(s), len(z_ref), len(Hs) + Psi = self._psi(s, z_ref) + Z = np.zeros((K, P)) + rows, rhs = [], [] + for m, H in enumerate(Hs): + Hp = H[:, None] + L_re = np.real(Hp * Psi); L_im = np.imag(Hp * Psi) # acts on c + R_re = -np.real(Psi); R_im = -np.imag(Psi) # acts on r^(m) + rows.append(np.hstack([L_re] + [R_re if j == m else Z for j in range(M)])) rhs.append(-np.real(d0 * H)) + rows.append(np.hstack([L_im] + [R_im if j == m else Z for j in range(M)])) rhs.append(-np.imag(d0 * H)) + A = np.vstack(rows) + b = np.concatenate(rhs) + return A, b, M, P - A = np.vstack(rows) # (2*Kf*M) x ((M+1)*P) - b = np.concatenate(rhs) # (2*Kf*M,) - return A, b, Psi - - # ---------- one relocation step ---------- - def vf70_step_once(self,s, H_list, z_prev, d0=1.0, column_scale=True): - """ - Returns: z_new (P,), c (P,), residues (list of M arrays length P) - """ - P = len(z_prev) - M = len(H_list) - - A, b, Psi = self.build_vf70_system(s, H_list, z_prev, d0=d0) - - # optional simple column scaling for conditioning - if column_scale: - col_norm = np.maximum(np.linalg.norm(A, axis=0), 1e-12) - x, *_ = np.linalg.lstsq(A / col_norm, b, rcond=None) - x = x / col_norm + def _step_70(self, s, H_list, z_ref, d0=1.0, scale=True): + A, b, M, P = self._build_70(s, H_list, z_ref, d0) + if scale: + coln = np.maximum(np.linalg.norm(A, axis=0), 1e-12) + x, *_ = np.linalg.lstsq(A / coln, b, rcond=None) + x = x / coln + cond = np.linalg.cond(A) else: x, *_ = np.linalg.lstsq(A, b, rcond=None) + cond = np.linalg.cond(A) - # unpack unknowns c = x[:P] - residues = [] + res_ratio = np.empty((P, M), np.complex128) off = P - for _ in range(M): - residues.append(x[off:off+P]) - off += P - - # pole relocation (zeros of d0 + Psi @ c) - Sigma = np.diag(-np.asarray(z_prev, dtype=np.complex128)) - T = Sigma - (np.ones((P, 1), dtype=np.complex128) @ (c.reshape(1, -1) / d0)) + for m in range(M): + res_ratio[:, m] = x[off:off+P]; off += P + # relocate poles with test matrix + Sigma = np.diag(-np.asarray(z_ref, np.complex128)) + T = Sigma - (np.ones((P, 1), np.complex128) @ (c.reshape(1, -1) / d0)) z_new = -np.linalg.eigvals(T) - z_new = self._enforce_lhp_conjugate(z_new) + z_new = self._lhp(z_new) + # cond = np.linalg.cond(T) + return z_new, c, cond, res_ratio - return z_new, c, residues + # -------- public API -------- + def fit(self, s, H, z0, n_iter=20, d0=1.0, include_const=True, include_linear=False, verbose=True): + """ + s : (K,) complex samples (j*2π*f_sel) + H : (K,) complex or (K,M) or list of M vectors + z0: initial poles (P,) complex (LHP + conjugate pairs recommended) + """ + # normalize responses -> list + if isinstance(H, (list, tuple)): + H_list = [np.asarray(h, np.complex128).reshape(-1) for h in H] + else: + H_arr = np.asarray(H, np.complex128) + if H_arr.ndim == 1: + H_list = [H_arr] + elif H_arr.ndim == 2 and H_arr.shape[0] == len(s): + H_list = [H_arr[:, i].copy() for i in range(H_arr.shape[1])] + else: + raise ValueError("H must be (K,), list of (K,), or (K,M) with M responses.") + M = len(H_list) - # ---------- iterate K times (K == n_iter) ---------- - def vf70_iterate(self,s, H_list, z0, n_iter=15, d0=1.0, verbose=True): - z = np.array(z0, dtype=np.complex128) - c = None - residues = None + z = self._lhp(np.asarray(z0, np.complex128)) + # SK/VF relocations for it in range(n_iter): - z_next, c, residues = self.vf70_step_once(s, H_list, z, d0=d0) + z_next, c_last, cond, _ = self._step_70(s, H_list, z, d0=d0, scale=True) + rel = np.linalg.norm(z_next) / max(1.0, np.linalg.norm(z)) if verbose: - rel = np.linalg.norm(z_next) / max(1.0, np.linalg.norm(z)) - print(f"[VF-70] iter {it+1:02d}/{n_iter:02d} Δz_rel = {rel}") + print(f"[VF-70] iter {it+1:02d}/{n_iter:02d} Δz_rel={rel} cond(z)={cond}") + self.cond.append(cond) + self.rel.append(rel) z = z_next - return z, c, residues - def eval_Hfit_from_70(self,s, z_prev, c, residues_m, d0=1.0): - """ - Evaluate H_fit for one response m using (70): - H_fit = (Psi @ residues_m) / (d0 + Psi @ c) - """ - Psi = 1.0 / (s[:, None] + np.asarray(z_prev)[None, :]) - D_ratio = d0 + Psi @ c - N_part = Psi @ residues_m - return N_part / D_ratio + # ---- Finalize: refit residues with fixed poles z (pole–residue model) ---- + Phi = self._psi(s, z) # K x P + # Build design matrix for extras + extras = [] + if include_const: extras.append(np.ones(len(s), np.complex128)) + if include_linear: extras.append(s.astype(np.complex128)) + if extras: + X_base = np.column_stack([Phi] + extras) # K x (P + E) + else: + X_base = Phi - def rms_abs(self,y): - return np.sqrt(np.mean(np.abs(y)**2)) + res = np.empty((len(z), M), np.complex128) + h = np.zeros(M, np.complex128) + g = np.zeros(M, np.complex128) - def rms_error(self,H, Hfit): - """ - Returns (abs_rms, relative_rms). Relative RMS is normalized by RMS of H. - """ - err = Hfit - H - abs_rms = self.rms_abs(err) - ref = max(self.rms_abs(H), 1e-16) - rel_rms = abs_rms / ref - return abs_rms, rel_rms + for m, Hm in enumerate(H_list): + theta, *_ = np.linalg.lstsq(X_base, Hm, rcond=None) # complex LS + res[:, m] = theta[:len(z)] + e = len(theta) - len(z) + if e >= 1: h[m] = theta[len(z)] + if e >= 2: g[m] = theta[len(z)+1] - # --- run K (== n_iter) VF-(70) steps and record history --- - def run_vf70_with_history(self,s, H, z0, K=25, d0=1.0, verbose=True): - """ - H: 1D complex array (single response). If you want multi-response, - pass a list [H11, H21, ...] and adapt the evaluation below. - """ - # Wrap single response as a list for the step function - H_list = [np.asarray(H, dtype=np.complex128)] - z = np.array(z0, dtype=np.complex128) - hist = { - "z": [z.copy()], - "c": [], - "r": [], - "Hfit": [], - "rms_abs": [], - "rms_rel": [], - } - for it in range(K): - z_new, c, residues = self.vf70_step_once(s, H_list, z, d0=d0, column_scale=True) - # Evaluate fitted response after this iteration - Hfit = self.eval_Hfit_from_70(s, z, c, residues[0], d0=d0) - abs_r, rel_r = self.rms_error(H, Hfit) + # store the rational function + self.poles = z # (P,) + self.res = res # (P, M) + self.h = h # (M,) + self.g = g # (M,) + return self - if verbose: - drel = np.linalg.norm(z_new - z) / max(1.0, np.linalg.norm(z)) - print(f"[VF-70] iter {it+1:02d}/{K:02d} Δz_rel={drel:.3e} RMS={abs_r:.3e} RMSrel={rel_r:.3e}") - - # save - hist["c"].append(c.copy()) - hist["r"].append(residues[0].copy()) - hist["Hfit"].append(Hfit.copy()) - hist["rms_abs"].append(abs_r) - hist["rms_rel"].append(rel_r) - z = z_new - hist["z"].append(z.copy()) - - return hist + # Evaluate the stored **rational** model on any grid + def evaluate(self, s_eval, m=None): + if not hasattr(self, "poles"): + raise RuntimeError("Model not fitted. Call fit(...) first.") + s_eval = np.asarray(s_eval, np.complex128).reshape(-1) + Phi = self._psi(s_eval, self.poles) # K_eval x P + if m is None: + H = Phi @ self.res + if np.any(self.h): H += self.h + if np.any(self.g): H += s_eval[:, None] * self.g + return H # (K_eval, M) or (K_eval,1) + m = int(m) + H = Phi @ self.res[:, m] + H += self.h[m] + H += s_eval * self.g[m] + return H # (K_eval,) - def evaluate(self, s_eval, z_ref, c, residues, d0=1.0): - """ - Evaluate the (70) model on any frequency grid. + # def plot_rel_and_cond(self): + # fig, (ax1, ax2) = plt.subplots(2,1,figsize=(15,20)) + # ax1.plot(np.log(self.rel), 'g-', label='rel') + # ax2.plot(np.log(self.cond), 'b-', label='cond') + # ax1.set_xlabel('Iteration') + # ax1.set_ylabel('Relative Change', color='g') + # ax2.set_ylabel('Condition Number', color='b') + # ax1.tick_params(axis='y', labelcolor='g') + # ax2.tick_params(axis='y', labelcolor='b') + # fig.tight_layout() + # plt.title('Relative Change and Condition Number per Iteration') + # # plt.show() + # plt.savefig(f"img_rel_cond.png") + def plot_rel_and_cond(self): + fig = make_subplots(rows=2, cols=1, subplot_titles=('Relative Change', 'Condition Number')) + fig.add_trace(plotly.graph_objs.Scatter(y=self.rel, mode='lines+markers', name='rel'), row=1, col=1) + fig.add_trace(plotly.graph_objs.Scatter(y=self.cond, mode='lines+markers', name='cond'), row=2, col=1) + fig.update_xaxes(title_text='Iteration', row=1, col=1) + fig.update_yaxes(title_text='Relative Change', row=1, col=1) + fig.update_yaxes(title_text='Condition Number', row=2, col=1) + fig.update_layout(height=800, width=800, title_text='Relative Change and Condition Number per Iteration') + fig.write_image("img_rel_cond.png") + # fig.show() - Parameters - ---------- - s_eval : (K_eval,) complex - Points where to evaluate (e.g., 1j*2π*freq_eval). - z_ref : (P,) complex - The *reference* poles used in the LS system that produced (c, residues). - For the last iteration in run_vf70_with_history, this is hist['z'][-2]. - c : (P,) complex - Denominator-update coefficients from (70). - residues : (P,) complex OR list of (P,) for multi-response - Residues for the response(s) from (70). - d0 : scalar, default 1.0 + def evaluate_on_freq(self, freq_eval, m=None): + return self.evaluate(1j * 2*np.pi * np.asarray(freq_eval, float), m=m) - Returns - ------- - H_eval : (K_eval,) complex OR (K_eval, M) for M responses - """ - s_eval = np.asarray(s_eval, dtype=np.complex128).reshape(-1) - z_ref = np.asarray(z_ref, dtype=np.complex128).reshape(-1) - Psi_eval = 1.0 / (s_eval[:, None] + z_ref[None, :]) # K_eval x P - Den = d0 + Psi_eval @ np.asarray(c, dtype=np.complex128) + # Optional: save/load the final rational model + def save(self, path): + if not hasattr(self, "poles"): + raise RuntimeError("Nothing to save; call fit(...) first.") + np.savez(path, poles=self.poles, res=self.res, h=self.h, g=self.g) - # Single response - if not isinstance(residues, (list, tuple)): - r = np.asarray(residues, dtype=np.complex128).reshape(-1) - Num = Psi_eval @ r - return Num / Den - - # Multi-response - M = len(residues) - H_eval = np.empty((s_eval.size, M), dtype=np.complex128) - for m, r_m in enumerate(residues): - r_m = np.asarray(r_m, dtype=np.complex128).reshape(-1) - H_eval[:, m] = (Psi_eval @ r_m) / Den - return H_eval + @classmethod + def load(cls, path): + d = np.load(path, allow_pickle=False) + obj = cls() + obj.poles = d["poles"]; obj.res = d["res"]; obj.h = d["h"]; obj.g = d["g"] + return obj def auto_select(H, freq, n_baseline=64, # log-spaced backbone points @@ -470,22 +439,17 @@ if __name__ == "__main__": H11 = np.array([y[i,0,0] for i in range(len(y))]) H11_slice,freqs_slice = auto_select(H11,freqs,max_points=20) s_slice = freqs_slice * 2j * np.pi - P_pairs = 2 + P_pairs = 1 z0 = generate_starting_poles(P_pairs, beta_min=1e8, beta_max=freqs_slice[-1]) z0 = np.array(z0, dtype=np.complex128) f70 = formula_70() - K = 25 - hist = f70.run_vf70_with_history(s_slice, H11_slice, z0, K=K, d0=1.0, verbose=True) + K = 10 + model = f70.fit(s_slice, H11_slice, z0, n_iter=K, d0=1.0, verbose=True) + model.save("vf70_model.npz") + model.plot_rel_and_cond() - it_show = K-1 - Hfit_final = hist["Hfit"][it_show] - - s_dense = 1j * 2*np.pi * freqs - c_final = hist["c"][it_show] - r_final = hist["r"][it_show] # single response - z_ref = hist["z"][it_show] - Hfit_dense = f70.evaluate(s_dense, z_ref, c_final, r_final, d0=1.0) + Hfit_dense = model.evaluate_on_freq(freqs) fig, axes = plt.subplots(2, 1, figsize=(10, 8), sharex=False) @@ -499,13 +463,13 @@ if __name__ == "__main__": ax0.legend(loc="best") # (2) RMS error vs iteration - ax1 = axes[1] - its = np.arange(1, K+1) - ax1.plot(its, hist["rms_rel"], '-o', lw=2) - ax1.set_xlabel("Iteration") - ax1.set_ylabel("RMS error (relative)") - ax1.grid(True, alpha=0.3) - ax1.set_title(f"RMS(final) = {hist['rms_rel'][-1]:.3e}") + # ax1 = axes[1] + # its = np.arange(1, K+1) + # ax1.plot(its, hist["rms_rel"], '-o', lw=2) + # ax1.set_xlabel("Iteration") + # ax1.set_ylabel("RMS error (relative)") + # ax1.grid(True, alpha=0.3) + # ax1.set_title(f"RMS(final) = {hist['rms_rel'][-1]:.3e}") fig.tight_layout() plt.savefig(f"img_formula_70.png")