diff --git a/.gitignore b/.gitignore index 3117b89..2be33ae 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__/ *.pyd __pypackages__/ env/ -.venv/ \ No newline at end of file +.venv/ +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..16c0227 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +1. 添加 A B C D 方法,对应不同的基 +2. 修改Levi部分和sk迭代部分,使用更通用的A B C D求解 +3. 添加正交基形式 +4. 加入多端口 +5. 引入QR,消除中间过程的残差求解,降低算法复杂度 +6. 使用最初始的表达式,获得无偏估计 \ No newline at end of file diff --git a/core/basis.py b/core/basis.py new file mode 100644 index 0000000..2f16635 --- /dev/null +++ b/core/basis.py @@ -0,0 +1,241 @@ +import numpy as np +from core.sk_iter import generate_starting_poles +from scipy.linalg import block_diag +import skrf as rf +from skrf import VectorFitting +from core.freqency import auto_select + +class BasicBasis: + def __init__(self,H,freqs,poles,weights=None): + self.least_squares_rms_error = None + self.least_squares_condition = None + self.eigenval_condition = None + self.eigenval_rms_error = None + + self.H = H + self.freqs = freqs + self.s = self.freqs * 2j * np.pi + self.P = len(poles) + self.poles = poles + self.Phi = self.generate_basis(self.s, self.poles) + self.A = self.matrix_A(self.poles) + self.B = self.vector_B(self.poles) + self.D = 1.0 + self.Cr,self.Cw = self.fit_denominator(self.H, d0=1.0, weights=weights) + + z = np.linalg.eigvals(self.A - self.B @ self.Cw) + p_next = -z + + # enforce LHP and pair ordering + p_next = np.where(np.real(p_next) < 0, p_next, -np.conj(p_next)) + p_next = np.sort_complex(p_next) + + self.next_poles = p_next + # z = np.where(np.real(z) < 0, z, -np.conj(z)) # enforce LHP + # self.next_poles = np.sort_complex(z) + self.eigenval_condition = np.linalg.cond(self.A - self.B @ self.Cw) + self.eigenval_rms_error = np.sqrt(np.mean(np.abs(np.real(z) - np.real(poles))**2 + np.abs(np.imag(z) - np.imag(poles))**2)) + + self.Dt = self.eval_Dt_state_space() + self.delta = self.Dt / weights if weights is not None else self.Dt + pass + + def eval_Dt_state_space(self): + """Return D(s_k)=C(s_k I - A)^(-1)B + D for all k (complex 1D array).""" + s = 1j * 2*np.pi * np.asarray(self.freqs, float).ravel() + A = np.asarray(self.A, np.complex128); n = A.shape[0] + B = np.asarray(self.B, np.complex128).reshape(n, 1) + C = np.asarray(self.Cw, float).reshape(1, n) + D = self.D + I = np.eye(n, dtype=np.complex128) + out = np.empty_like(s, dtype=np.complex128) + for k, sk in enumerate(s): + DS = D + (C @ np.linalg.inv(sk*I - A) @ B) + out[k] = DS[0, 0] + return out + + + def generate_basis(self,s, poles): + """Real basis of (15)-(16); returns Φ(s) and a layout for packing C.""" + cols = [] + i = 0 + while i < len(poles): + p = poles[i] + if p.real > 0: + raise ValueError("poles must be in the LHP") + if i+1 < len(poles) and np.isclose(poles[i+1], np.conj(p)): + pc = poles[i+1] + phi1 = 1/(s - p) + 1/(s - pc) # eq (15)generate_basis + phi2 = 1j*(1/(s - p) - 1/(s - pc)) # eq (16) (fixed sign) + cols += [phi1, phi2] + i += 2 + else: + cols.append(1/(s - p)) + i += 1 + Phi = np.column_stack(cols).astype(np.complex128) + return Phi + + def matrix_A(self, poles): + def A_block(p): + if abs(p.imag) < 1e-14: + return np.array([[p.real]], float) # A_p = [ p ] + return np.array([[p.real, p.imag], # A_p = [[Re p, Im p], + [-p.imag, p.real]], float) # [-Im p, Re p]] + A = None; i = 0 + while i < len(poles): + p = poles[i] + Ab = A_block(p) + if i+1 < len(poles) and np.isclose(poles[i+1], np.conj(p)): i += 2 + else: i += 1 + A = Ab if A is None else block_diag(A, Ab) + return A + + def vector_B(self, poles): + def B_block(p): + return np.array([[1.0]], float) if abs(p.imag)<1e-14 else np.array([[2.0],[0.0]], float) + B = None; i = 0 + while i < len(poles): + p = poles[i] + Bb = B_block(p) + if i+1 < len(poles) and np.isclose(poles[i+1], np.conj(p)): i += 2 + else: i += 1 + B = Bb if B is None else np.vstack([B, Bb]) + return B + + def fit_denominator(self, H, d0=1.0, weights=None): + """ + Solve formula (70) on the real basis Φ to obtain: + - d (real) → packs into C for this state's block structure + - gamma (complex) + Optional 'weights' (K,) apply row scaling: SK weighting if 1/|D_prev|. + """ + if weights is None: + weights = np.diag(np.ones(len(H), np.complex128)) + else: + weights = np.diag([1/res for res in weights]) + s = self.s + H = np.asarray(H, np.complex128).reshape(-1,1) + Phi = self.Phi + psi = weights @ Phi + psi = Phi + + HPhi = H * Phi + + A_re = np.hstack([np.real(-psi), np.real(-HPhi)]) + A_im = np.hstack([np.imag(-psi), np.imag(-HPhi)]) + + b_re = np.real(d0 * H) + b_im = np.imag(d0 * H) + + A = np.vstack([A_re, A_im]).astype(float) + # rown = np.linalg.norm(A, axis=1) + # rown = np.sqrt(rown) + # A = rown[:,None] * A + b = np.concatenate([b_re, b_im]).astype(float) + + x = np.linalg.inv(A.T @ A) @ A.T @ b + + self.least_squares_rms_error = np.sqrt(np.mean((A @ x - b)**2)) + self.least_squares_condition = np.linalg.cond(A) + + Cn,Cd = self.vector_C(x) + return Cn,Cd + + def vector_C(self,x): + Cn = np.asarray([x[:len(x)//2]], float).reshape(1,-1) + Cd = np.asarray([x[len(x)//2:]], float).reshape(1,-1) + return Cn, Cd + + def evaluate(self,freqs,poles,Cn,Cd,d0=1.0): + s = 1j * 2*np.pi * np.asarray(freqs, float).ravel() + phi = self.generate_basis(s, poles) + num = phi @ Cn.T + den = d0 + phi @ Cd.T + H = num / den + return H.ravel() + +if __name__ == "__main__": + network = rf.Network("/tmp/paramer/simulation/3000/3000.s2p") + K = 10 + H11,freqs = auto_select([network.y[i][0][0] for i in range(2,len(network.y))],network.f[2:],max_points=20) + poles = generate_starting_poles(2,beta_min=freqs[0]/1.1,beta_max=freqs[-1]*1.1) + + Dt_1 = np.ones((len(freqs),1),np.complex128) + # Levi step (no weighting): + basis = BasicBasis(H11,freqs,poles=poles) + Dt = basis.Dt + poles = basis.next_poles + + print("Levi step (no weighting):") + print("A:",basis.A) + print("B:",basis.B) + print("C:",basis.Cw) + print("D:",basis.D) + print("next_pozles:",basis.next_poles) + print("Dt:",Dt, "norm:",np.linalg.norm(Dt)) + + # SK weighting (optional, after first pass): + least_squares_condition = [] + least_squares_rms_error = [] + eigenval_condition = [] + eigenval_rms_error = [] + for i in range(K): + basis = BasicBasis(H11,freqs,poles=poles,weights=Dt) + Dt_1 = Dt + Dt = basis.Dt + poles = basis.next_poles + print(f"SK Iteration {i+1}/{K}") + print("A:",basis.A) + print("B:",basis.B) + print("C:",basis.Cw) + print("D:",basis.D) + print("z:",basis.next_poles) + print("Dt:",Dt) + print("Dt/Dt-1",np.linalg.norm(Dt) / np.linalg.norm(Dt_1)) + least_squares_condition.append(basis.least_squares_condition) + least_squares_rms_error.append(basis.least_squares_rms_error) + eigenval_condition.append(basis.eigenval_condition) + eigenval_rms_error.append(basis.eigenval_rms_error) + + # H11_evaluated = basis.evaluate_pole_residue(network.f[1:],poles,basis.C[0]) + H11_evaluated = basis.evaluate(network.f[2:], poles, basis.Cr[0],basis.Cw[0], d0=1.0) + import matplotlib.pyplot as plt + fig, axes = plt.subplots(3, 2, figsize=(15, 16), sharex=False) + + ax0 = axes[0][0] + ax0.plot(network.f[2:], np.abs([network.y[i][0][0] for i in range(2,len(network.y))]), 'o', ms=4, color='red', label='Samples') + ax0.plot(network.f[2:], np.abs(H11_evaluated), '-', lw=2, color='k', label='Fit') + ax0.plot(freqs, np.abs(H11), 'x', ms=4, color='blue', label='Input Samples') + ax0.set_title("Response i=0, j=0") + ax0.set_ylabel("Magnitude") + ax0.legend(loc="best") + + ax1 = axes[1][0] + ax1.plot(least_squares_condition, label='Least Squares Condition') + ax1.set_title("least_squares_condition") + ax1.set_ylabel("Magnitude") + ax1.legend(loc="best") + + ax2 = axes[1][1] + ax2.plot(least_squares_rms_error, label='Least Squares RMS Error') + ax2.set_title("least_squares_rms_error") + ax2.set_ylabel("Magnitude") + ax2.legend(loc="best") + + ax3 = axes[2][0] + ax3.plot(eigenval_condition, label='Eigenvalue Condition') + ax3.set_title("eigenval_condition") + ax3.set_ylabel("Magnitude") + ax3.legend(loc="best") + + ax4 = axes[2][1] + ax4.plot(eigenval_rms_error, label='Eigenvalue RMS Error') + ax4.set_title("eigenval_rms_error") + ax4.set_ylabel("Magnitude") + ax4.legend(loc="best") + fig.tight_layout() + plt.savefig(f"basic_basis.png") + + + + diff --git a/core/freqency.py b/core/freqency.py new file mode 100644 index 0000000..617d296 --- /dev/null +++ b/core/freqency.py @@ -0,0 +1,131 @@ +import numpy as np +def auto_select(H, freq, + n_baseline=64, # log-spaced backbone points + peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection + peak_window=5, # take ±peak_window samples around each peak + topgrad_q=0.98, # keep top 2% largest slope/phase-change points + max_points=25, # final cap on selected samples (None = no cap) + ensure_ends=True): + """ + Select several significant sample points for vector fitting. + + Strategy: + 1) Always keep endpoints (optional). + 2) Add a log-spaced baseline over the band. + 3) Detect resonance peaks in |H| (on a log scale) and keep small windows around them. + 4) Add points with the largest magnitude slope and phase-change (w.r.t log-f). + 5) De-duplicate, sort, and optionally thin to 'max_points' with priority + to endpoints and detected peaks. + + Parameters + ---------- + H : (N,) complex array + Frequency response samples. + freq : (N,) float array + Frequency axis [Hz], strictly increasing. + n_baseline : int + Count of log-spaced baseline samples across the band. + peak_prominence : float + Peak prominence threshold as a fraction of the dynamic range in log|H|. + 0.05 ≈ keep peaks ≥ 5% of the range. + peak_window : int + Number of neighbor indices to include on each side of every detected peak. + topgrad_q : float in (0,1) + Quantile for selecting strong slope/phase points. + 0.98 ⇒ keep the top 2% largest derivatives. + max_points : int or None + If not None, cap the total number of selected indices to this value. + ensure_ends : bool + Always include the first and last samples. + + Returns + ------- + H_sel : (K,) complex array + freq_sel : (K,) float array + """ + H = np.asarray(H).reshape(-1) + f = np.asarray(freq).reshape(-1) + if H.size != f.size: + raise ValueError("H and freq must have the same length.") + N = f.size + if N < 4: + return H.copy(), f.copy() + + eps = 1e-16 + mag = np.abs(H) + logmag = np.log10(mag + eps) + phase = np.unwrap(np.angle(H)) + + # log-frequency axis (scale-invariant derivatives) + # keep it linear if any non-positive freq sneaks in + if np.all(f > 0): + lf = np.log(f) + else: + lf = f.copy() + + dlf = np.gradient(lf) + d_logmag = np.gradient(logmag) / (dlf + 1e-16) + d_phase = np.gradient(phase) / (dlf + 1e-16) + + idx = set() + if ensure_ends: + idx.update([0, N-1]) + + # 1) log-spaced baseline + if n_baseline > 0: + # map a log grid to nearest indices + grid = np.linspace(lf.min(), lf.max(), n_baseline) + base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1) + idx.update(np.unique(base_idx).tolist()) + + # 2) peaks in |H| + try: + from scipy.signal import find_peaks + dyn = logmag.max() - logmag.min() + prom = peak_prominence * (dyn + 1e-12) + peaks, _ = find_peaks(logmag, prominence=prom) + except Exception: + # simple fallback: strict local maxima + peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1 + + for p in peaks: + lo = max(0, p - peak_window) + hi = min(N, p + peak_window + 1) + idx.update(range(lo, hi)) + + # 3) strongest slope / phase-change points + thr_slope = np.quantile(np.abs(d_logmag), topgrad_q) + thr_phase = np.quantile(np.abs(d_phase), topgrad_q) + idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist()) + idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist()) + + # 4) finalize set + sel = np.array(sorted(idx), dtype=int) + + # 5) optional thinning with priority to endpoints and peaks + if max_points is not None and sel.size > max_points: + priority = np.zeros(sel.size, dtype=int) + if ensure_ends: + priority[(sel == 0) | (sel == N-1)] = 3 + if peaks.size: + priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2) + + keep = [] + budget = max_points + # keep highest-priority first + for lev in (3, 2, 1, 0): + cand = sel[priority == lev] + if cand.size == 0: + continue + if cand.size <= budget: + keep.extend(cand.tolist()) + budget -= cand.size + else: + step = max(1, int(np.ceil(cand.size / budget))) + keep.extend(cand[::step][:budget].tolist()) + budget = 0 + if budget == 0: + break + sel = np.array(sorted(set(keep)), dtype=int) + + return H[sel], f[sel] \ No newline at end of file diff --git a/core/orthonormal_basis.py b/core/orthonormal_basis.py index 096dea2..1f64ac7 100644 --- a/core/orthonormal_basis.py +++ b/core/orthonormal_basis.py @@ -1,36 +1,5 @@ import numpy as np -# ------------------------------------------------------------ -# 按论文公式 (9)(10)(11) 生成 Muntz–Laguerre 正交有理基 (解析形式): -# -# 给定稳定极点集合 {p_k} (Re(p_k)<0)。论文记法中使用 -a_k,其中 Re(a_k)>0。 -# 对应关系:p_k = -a_k ⇒ a_k = -p_k, Re(a_k)= -Re(p_k) >0 -# -# 连续内积意义下(沿 jω 轴积分)这些 φ_k 解析正交。离散频率采样后数值上 -# 可能偏离,可再用加权 QR 做数值再正交(可选)。 -# -# 公式在“稳定极点 p 表达”下的改写: -# (原) 实极点: φ_p(s) = sqrt(2 Re(a_p)) / (s + a_p) * Π (s - a_i^*)/(s + a_i) -# 变换 a_p = -p ⇒ Re(a_p)= -Re(p) = σ >0 且 (s + a_p) = (s - p) -# 且乘积 (s - a_i^*)/(s + a_i) = (s + p_i^*)/(s - p_i) -# ⇒ φ_p(s) = sqrt(-2 Re(p)) / (s - p) * Π_{i0 的 p 作为首: -# (原) φ_p = sqrt(2 Re(a_p)) (s - |a_p|)/[(s + a_p)(s + a_p^*)] * Π(...) -# (原) φ_{p+1}= sqrt(2 Re(a_p)) (s + |a_p|)/[(s + a_p)(s + a_p^*)] * Π(...) -# 代入 a_p=-p: -# Re(a_p)= -Re(p)=σ>0, (s + a_p) = (s - p), (s + a_p^*)=(s - p^*) -# |a_p| = |p| -# 乘积同上 ⇒ Π_{i= self.N: -# raise StopIteration - -# p = self.poles[self.k] -# if np.real(p) >= 0: -# raise ValueError(f"极点必须在左半平面: {p}") - -# # 复对首 (正虚部) -# if np.iscomplex(p) and np.imag(p) > 0: -# if self.k + 1 >= self.N: -# raise ValueError("复极点缺少共轭") -# pc = self.poles[self.k + 1] -# if not np.isclose(pc, np.conj(p)): -# raise ValueError("复极点未按 (p, p*) 顺序排列 (正虚部在前)") -# sigma = -np.real(p) # >0 -# scale = np.sqrt(2 * sigma) -# r = np.abs(p) -# denom = (self.s - p) * (self.s - pc) - -# # 两个基函数 -# phi_p = scale * (self.s - r) / denom * self.product -# phi_pc = scale * (self.s + r) / denom * self.product - -# # product 先乘 (s + p^*)/(s - p),再乘 (s + p)/(s - p^*) -# self.product = self.product * (self.s + pc) / (self.s - p) -# self.product = self.product * (self.s + p) / (self.s - pc) - -# self.k += 2 -# return [phi_p, phi_pc] - -# # 复对次 (负虚部) —— 应该被首元素处理,出现表示顺序错误 -# if np.iscomplex(p) and np.imag(p) < 0: -# raise ValueError("检测到负虚部复极点但其共轭尚未处理,请将正虚部成员放在前面。") - -# # 实极点 -# sigma = -np.real(p) -# if sigma <= 0: -# raise ValueError("实极点实部应为负 (稳定)。") -# scale = np.sqrt(2 * sigma) -# phi = scale / (self.s - p) * self.product -# # 更新乘积 -# self.product = self.product * (self.s + p) / (self.s - p) - -# self.k += 1 -# return [phi] - -# def generate_muntz_laguerre_basis(s: np.ndarray, init_poles: list | np.ndarray): -# """ -# 生成完整基函数列表: [φ_0=1, φ_1, φ_2, ...] -# """ -# basis = [np.ones_like(s, dtype=complex)] -# for block in MuntzLaguerreIterator(s, init_poles): -# basis.extend(block) -# return basis - -# if __name__ == "__main__": -# # 示例稳定极点 (复对正虚部在前) -# stable_poles = [ -# -0.8e9, -# -1.0e9 + 2.5e9j, -# -1.0e9 - 2.5e9j, -# -2.2e9 -# ] -# freqs = np.linspace(1e8, 8e9, 400) -# s = 1j * 2 * np.pi * freqs - -# basis = generate_muntz_laguerre_basis(s, stable_poles) -# print(f"生成 {len(basis)} 个基函数,{basis}") \ No newline at end of file diff --git a/core/robust.py b/core/robust.py index ec80240..e69de29 100644 --- a/core/robust.py +++ b/core/robust.py @@ -1,191 +0,0 @@ -from models.basic import ModelBasic -from typing import List,Literal,Dict -import numpy as np -import random -from pydantic import BaseModel -from skrf import Network -from models.basic import ModelBasicDatasetUnit - -class RobustParametricConfig(BaseModel): - n_poles: int - max_iter: int = 10 - parameter_type: Literal["s","y","z"] - -class RobustParametricModel: - def __init__(self,model:ModelBasic,config:RobustParametricConfig): - self.model = model - self.config = config - - # 区分训练集和测试集 - def _train_test_split(self,train_ratio:float=0.8,random_state:int=42): - random.seed(random_state) - dataset = self.model.results - random.shuffle(dataset) - train_size = int(len(dataset)*train_ratio) - self.train_set = dataset[:train_size] - self.test_set = dataset[train_size:] - return self.train_set, self.test_set - - def first_iteration_step(self, datasets: List[ModelBasicDatasetUnit], - param_degree: int = 1): - ''' - SK 迭代的第一步 (t=0) – 对应论文公式 (3). - - 目标: - 最小化 sum_{样本 n, 频点 f} | N^{(0)}(s_f, g_n) - H(s_f, g_n) |^2 - 在 t=0 阶段我们令 D^{(0)}(s,g)=1, 仅拟合分子: - N^{(0)}(s,g) = Σ_{p∈P} Σ_{v∈V} c_{p,v} * φ_freq_p(s) * φ_param_v(g) - - 因此是一个纯线性最小二乘: - H ≈ Σ_{p,v} c_{p,v} F[:,p] ⊗ G[n,v] - - 记: - F: (Nf, P) 频率正交(或原始)基列 - G: (Ns, V) 参数多项式(含常数)基列 - 设计矩阵 A 大小 (Ns*Nf, P*V), A[(n*Nf + f), (p*V + v)] = F[f,p] * G[n,v] - - 输出: - self.first_iter_coeffs[(i,j)] = C_{p,v} (P × V) 每个端口对一套系数 - self.freq_basis_F = F - self.param_basis_G = G - self.poles 初始极点 (供后续构造有理正交基 / SK 加权使用) - - 参数: - datasets: 训练用样本列表 - param_degree: 参数多项式最大总次数 (默认 1 → 常数 + 线性) - - 注意: - 这里使用简单频率基 [1, 1/(s-a_k)],未做 QR 正交化; - 后续 SK 迭代 / 正交化可在第二步再进行。 - ''' - assert len(datasets) > 0, "空数据集" - # ---------------- 收集频率与端口信息 ---------------- - Ns = len(datasets) - freqs = datasets[0].freqs # 频率需要对齐 - Nf = freqs.shape[0] - nports = self.model.info.nports - - # ---------------- 构造初始极点 (对数分布 + 阻尼) ---------------- - def _init_poles(n_poles: int): - fmin, fmax = freqs[0], freqs[-1] - if n_poles <= 0: - return np.array([], dtype=complex) - # 避免 0 Hz,用第二个点或微小偏移 - start_f = max(fmin if fmin > 0 else freqs[1] if Nf > 1 else 1.0e-3, 1e-12) - f_samples = np.logspace(np.log10(start_f), np.log10(fmax), n_poles) - sigma = 0.02 * 2 * np.pi * fmax # 固定阻尼,可放入 config - return -sigma + 1j * 2 * np.pi * f_samples - - self.poles = _init_poles(self.config.n_poles) - s_vec = 1j * 2 * np.pi * freqs # (Nf,) - - # ---------------- 频率基 F ---------------- - # 列0: 常数 1, 后续列: 1/(s - a_k) - P = 1 + len(self.poles) - F = np.zeros((Nf, P), dtype=complex) - F[:, 0] = 1.0 - for k, a in enumerate(self.poles, start=1): - F[:, k] = 1.0 / (s_vec - a) - self.freq_basis_F = F # (Nf, P) - - # ---------------- 参数基 G ---------------- - # 取出参数向量 g = [param1, param2, ...] - # 假设 datasets[i].parameters 为 dict - param_keys = list(datasets[0].parameters.keys()) - d = len(param_keys) - # 构造参数矩阵 (Ns,d) - Pmat = np.zeros((Ns, d), dtype=float) - for i, unit in enumerate(datasets): - for j, k in enumerate(param_keys): - Pmat[i, j] = float(unit.parameters[k]) - # 标准化 - mean = Pmat.mean(axis=0) - std = Pmat.std(axis=0) + 1e-15 - Xn = (Pmat - mean) / std - - # 生成多项式指数 (总次数 <= param_degree) - def _gen_param_exps(dim, D): - exps = [] - def rec(cur, idx, rem): - if idx == dim: - exps.append(tuple(cur)); return - for t in range(rem + 1): - cur.append(t); rec(cur, idx + 1, rem - t); cur.pop() - rec([], 0, D) - return exps - - exps = _gen_param_exps(d, param_degree) # V_exps - V = len(exps) - G = np.zeros((Ns, V), dtype=float) - for vidx, e in enumerate(exps): - val = np.ones(Ns) - for j, p in enumerate(e): - if p: - val *= Xn[:, j] ** p - G[:, vidx] = val - self.param_basis_G = G # (Ns, V) - self.param_basis_meta = { - "keys": param_keys, - "mean": mean.tolist(), - "std": std.tolist(), - "exponents": exps, - "degree": param_degree - } - - # ---------------- 构造设计矩阵 A (Kronecker) ---------------- - # A shape: (Ns*Nf, P*V) - # 利用广播:A_block[n,f,p,v] = F[f,p] * G[n,v] - A4 = np.einsum('fp,nv->nfpv', F, G) # (Ns, Nf, P, V) - A = A4.reshape(Ns * Nf, P * V) - - # ---------------- 采集目标 H 数据 ---------------- - # 对每个端口对 (i,j) 分别解一个向量 c_{p,v} - # 选择参数类型 - param_type = self.config.parameter_type.lower() - valid_types = {"s", "y", "z"} - if param_type not in valid_types: - raise ValueError(f"parameter_type 必须在 {valid_types}") - - # 预先载入全部 Network (避免重复 IO) - # H_data[(i,j)] -> (Ns,Nf) 复数 - H_data = {} - for i_port in range(nports): - for j_port in range(nports): - H_mat = np.zeros((Ns, Nf), dtype=complex) - for n, unit in enumerate(datasets): - net: Network = unit.network - if param_type == "s": - sij = net.s[:, i_port, j_port] - elif param_type == "y": - sij = net.y[:, i_port, j_port] - else: - sij = net.z[:, i_port, j_port] - # 插值或对齐假设已完成 - H_mat[n, :] = sij - H_data[(i_port, j_port)] = H_mat - - # ---------------- 最小二乘求系数 ---------------- - self.first_iter_coeffs = {} - # 可选: 预计算 A^+ (伪逆) 如果数据不巨大 - # pinv = np.linalg.pinv(A) - for key, Hmat in H_data.items(): - b = Hmat.reshape(Ns * Nf) # (Ns*Nf,) - # 解 x (长度 P*V) - x, *_ = np.linalg.lstsq(A, b, rcond=None) - C = x.reshape(P, V) - self.first_iter_coeffs[key] = C - - # ---------------- 存储便于后续 SK 迭代使用 ---------------- - self.meta_first_iter = { - "A_shape": A.shape, - "num_samples": Ns, - "num_freqs": Nf, - "freq_basis_dim": P, - "param_basis_dim": V - } - if getattr(self.config, "verbose", True): - print(f"[t=0] 线性最小二乘完成: A={A.shape}, 频率基P={P}, 参数基V={V}, 端口对={nports*nports}") - - return self.first_iter_coeffs - - \ No newline at end of file diff --git a/core/sk_iter.py b/core/sk_iter.py index f230398..47681f6 100644 --- a/core/sk_iter.py +++ b/core/sk_iter.py @@ -2,7 +2,6 @@ import numpy as np from dataclasses import dataclass from typing import List, Dict, Tuple -# ---------- 1. 生成初始极点(按论文 Re(-a_p) 小、Im 线性分布) ---------- def generate_starting_poles(n_pairs: int, beta_min: float, beta_max: float, alpha_scale: float = 0.01): """ 仅生成复共轭对: p = -alpha + j beta, p*。 @@ -19,308 +18,3 @@ def generate_starting_poles(n_pairs: int, beta_min: float, beta_max: float, alph poles += [p, np.conj(p)] print(f"生成 {len(poles)} 个初始极点 (复对) {poles}]") return poles - -# ---------- 2. Muntz–Laguerre 频率基 (与你现有 orthonormal_basis 中一致的核心) ---------- -def muntz_laguerre_basis(freqs_hz: np.ndarray, stable_poles: List[complex]) -> np.ndarray: - """ - 返回 Φ_ml (Nf, P) 列顺序: φ_0=1, 后续依次 (单极点 / 复对两列) - """ - s = 1j * 2 * np.pi * freqs_hz - basis = [np.ones_like(s, dtype=complex)] - product = np.ones_like(s, dtype=complex) - k = 0 - N = len(stable_poles) - while k < N: - p = stable_poles[k] - if np.real(p) >= 0: - raise ValueError("极点需在左半平面") - if np.imag(p) > 0: # 复对首 - if k + 1 >= N or not np.isclose(stable_poles[k+1], np.conj(p)): - raise ValueError("复对未配对") - pc = stable_poles[k+1] - sigma = -np.real(p) - scale = np.sqrt(2 * sigma) - r = np.abs(p) - denom = (s - p) * (s - pc) - phi_p = scale * (s - r) / denom * product - phi_pc = scale * (s + r) / denom * product - basis.extend([phi_p, phi_pc]) - # update product - product = product * (s + pc)/(s - p) * (s + p)/(s - pc) - k += 2 - elif np.imag(p) < 0: - raise ValueError("负虚部复极点必须跟在正虚部之后") - else: # 实极点 - sigma = -np.real(p) - scale = np.sqrt(2 * sigma) - phi = scale / (s - p) * product - basis.append(phi) - product = product * (s + p)/(s - p) - k += 1 - return np.column_stack(basis) # (Nf, P) - -# ---------- 3. 原始部分分式基 ---------- -def raw_partial_fraction_basis(freqs_hz: np.ndarray, stable_poles: List[complex]) -> np.ndarray: - s = 1j * 2 * np.pi * freqs_hz - cols = [np.ones_like(s, dtype=complex)] - for p in stable_poles: - cols.append(1.0 / (s - p)) - return np.column_stack(cols) # (Nf, P) - -# ---------- 4. 变换矩阵 Φ_ml T = G_raw ---------- -def compute_transform_matrix(Phi_ml: np.ndarray, G_raw: np.ndarray, weights: np.ndarray | None = None) -> np.ndarray: - # 加权最小二乘 T = (Φ^H W Φ)^{-1} Φ^H W G - if weights is None: - WPhi = Phi_ml - WG = G_raw - M = Phi_ml.conj().T @ WPhi - RHS = Phi_ml.conj().T @ WG - else: - w = weights[:, None] - M = Phi_ml.conj().T @ (w * Phi_ml) - RHS = Phi_ml.conj().T @ (w * G_raw) - T = np.linalg.solve(M, RHS) - return T # (P,P), Φ T = G - -# ---------- 5. 参数多项式正交基 ---------- -def generate_param_basis(param_matrix: np.ndarray, total_degree: int): - """ - param_matrix: (Ns, d) - 返回: - Qμ: (Ns, V) 正交列 - Rμ: (V, V) 上三角 - exps: 多指数列表 - """ - X = param_matrix - Ns, d = X.shape - # 生成多指数 - exps=[] - def rec(cur,i,rem): - if i==d: - exps.append(tuple(cur)); return - for k in range(rem+1): - cur.append(k); rec(cur,i+1,rem-k); cur.pop() - rec([],0,total_degree) - V = len(exps) - M = np.zeros((Ns, V)) - for idx,e in enumerate(exps): - v = np.ones(Ns) - for j,p in enumerate(e): - if p: v *= X[:,j]**p - M[:,idx]=v - Q,R = np.linalg.qr(M) - return Q,R,exps - -# ---------- 6. t=0 构建设计矩阵并解 C ---------- -def fit_t0(H_list: List[np.ndarray], Phi_f: np.ndarray, Qμ: np.ndarray) -> np.ndarray: - """ - H_list: 长度 Ns, 每项 (Nf,) 复数 - Phi_f: (Nf, P) - Qμ: (Ns, V) - 返回: C (P,V) - """ - Ns = len(H_list); Nf,P = Phi_f.shape; V = Qμ.shape[1] - cols = P*V - A = np.zeros((Ns*Nf, cols), dtype=complex) - b = np.zeros(Ns*Nf, dtype=complex) - r=0 - for n,Hn in enumerate(H_list): - blk = np.einsum('fp,v->fpv', Phi_f, Qμ[n]).reshape(Nf, cols) - A[r:r+Nf,:]=blk - b[r:r+Nf]=Hn - r+=Nf - x, *_ = np.linalg.lstsq(A, b, rcond=None) - C = x.reshape(P, V) - return C - -# ---------- 7. t=1 (首轮 SK) 解 C, Ct ---------- -def fit_t1_SK(H_list: List[np.ndarray], Phi_f: np.ndarray, Qμ: np.ndarray, - C_prev: np.ndarray, max_iter: int =1, tol: float =1e-3): - """ - 只做一轮 (或少量) SK,初始 D=1。 - 返回: C_new, Ct_new - """ - Ns = len(H_list); Nf,P = Phi_f.shape; V = Qμ.shape[1] - # 初始化分母系数 Ct: Ct[0,0]=1 - Ct = np.zeros((P,V), dtype=complex) - Ct[0,0]=1.0 - C = C_prev.copy() - for it in range(max_iter): - # 构建线性系统: (N - D*H)=0 - # 未知顺序: C(:), Ct(:) 去掉固定 Ct[0,0] - mask_fix = np.zeros((P,V), dtype=bool); mask_fix[0,0]=True - col_map={} - col=0 - for i in range(P): - for j in range(V): - col_map[('C',i,j)]=col; col+=1 - for i in range(P): - for j in range(V): - if mask_fix[i,j]: continue - col_map[('Ct',i,j)]=col; col+=1 - A = np.zeros((Ns*Nf, col), dtype=complex) - b = np.zeros(Ns*Nf, dtype=complex) - r=0 - # 评价当前 D_prev= Σ Ct φ ψ - for n,Hn in enumerate(H_list): - # 分子块 (Φ_f ⊗ ψ_n) - blk = np.einsum('fp,v->fpv', Phi_f, Qμ[n]).reshape(Nf, P*V) - # 填 C 部分 - for i in range(P): - for j in range(V): - A[r:r+Nf, col_map[('C',i,j)]] = blk[:, i*V + j] - # 分母块: - H * blk - for i in range(P): - for j in range(V): - if mask_fix[i,j]: continue - A[r:r+Nf, col_map[('Ct',i,j)]] = - Hn * blk[:, i*V + j] - r+=Nf - # 求解 - x, *_ = np.linalg.lstsq(A, b, rcond=None) - # 拆回 - idx=0 - for i in range(P): - for j in range(V): - C[i,j]=x[idx]; idx+=1 - Ct_new = Ct.copy() - for i in range(P): - for j in range(V): - if mask_fix[i,j]: continue - Ct_new[i,j]=x[idx]; idx+=1 - # 收敛性简单检查:分母变化 - diff = np.max(np.abs(Ct_new - Ct)) - Ct = Ct_new - if diff < tol: - break - return C, Ct - -# ---------- 8. 假极点检测 (在 raw 基上) ---------- -def detect_spurious_poles(C_ml: np.ndarray, Ct_ml: np.ndarray, T: np.ndarray, - Qμ: np.ndarray, - tau_cancel=1e-2, tau_small=1e-3, eps=1e-14): - """ - C_ml, Ct_ml: (P,V) (Muntz-Laguerre 基) - T: Φ_ml T = G_raw - Qμ: (Ns,V) - """ - # raw 系数: c_raw = T^{-1} c_ml - Tinv = np.linalg.inv(T) - C_raw = Tinv @ C_ml - Ct_raw = Tinv @ Ct_ml - P,V = C_ml.shape - Ns = Qμ.shape[0] - r_vals = C_raw @ Qμ.T - q_vals = Ct_raw @ Qμ.T - metrics={} - scales=[] - for k in range(1,P): # 跳过常数列 - rv = r_vals[k]; qv = q_vals[k] - diff = np.max(np.abs(rv - qv)) - scale = np.max([np.max(np.abs(rv)), np.max(np.abs(qv)), eps]) - eta = diff / scale - metrics[k] = {"diff": diff, "scale": scale, "eta": eta} - scales.append(scale) - S_max = max(scales) if scales else 1.0 - cancel_spurious = [k for k in range(1,P) - if metrics[k]["eta"] < tau_cancel and metrics[k]["scale"] > tau_small * S_max] - small_spurious = [k for k in range(1,P) - if metrics[k]["scale"] <= tau_small * S_max] - return { - "cancel_spurious": cancel_spurious, - "small_spurious": small_spurious, - "metrics": metrics, - "S_max": S_max, - "C_raw": C_raw, - "Ct_raw": Ct_raw - } - -# ---------- 9. 统一入口 ---------- -@dataclass -class OPVFResult: - C_ml: np.ndarray - Ct_ml: np.ndarray - C_raw: np.ndarray - Ct_raw: np.ndarray - T: np.ndarray - Qμ: np.ndarray - Rμ: np.ndarray - exps: List[Tuple[int]] - poles: List[complex] - spurious: Dict - -def opvf_from_H(H_list: List[np.ndarray], - freqs_hz: np.ndarray, - param_matrix: np.ndarray, - total_degree: int, - poles: List[complex], - do_t1: bool = True) -> OPVFResult: - """ - H_list: 长度 Ns; 每项 shape (Nf,) - freqs_hz: (Nf,) - param_matrix: (Ns,d) 归一化前参数值 (内部不自动标准化以保持可控) - poles: 初始稳定极点列表 - """ - # 频率基 - Phi_ml = muntz_laguerre_basis(freqs_hz, poles) # (Nf,P) - G_raw = raw_partial_fraction_basis(freqs_hz, poles) # (Nf,P) - # 简单权 (ω 权): w = Δf (因 (1/2π)∫ φφ* dω => ∑ Δf) - w = _trap_weights(freqs_hz) - T = compute_transform_matrix(Phi_ml, G_raw, w) - - # 参数基 - Qμ, Rμ, exps = generate_param_basis(param_matrix, total_degree) - - # t=0 - C_ml = fit_t0(H_list, Phi_ml, Qμ) - - if do_t1: - C_ml, Ct_ml = fit_t1_SK(H_list, Phi_ml, Qμ, C_ml, max_iter=1) - else: - # D=1 - P,V = C_ml.shape - Ct_ml = np.zeros((P,V), dtype=complex) - Ct_ml[0,0]=1.0 - - # 假极点检测 - spurious = detect_spurious_poles(C_ml, Ct_ml, T, Qμ) - - return OPVFResult( - C_ml=C_ml, - Ct_ml=Ct_ml, - C_raw=spurious["C_raw"], - Ct_raw=spurious["Ct_raw"], - T=T, - Qμ=Qμ, - Rμ=Rμ, - exps=exps, - poles=poles, - spurious=spurious - ) - -# ---------- 辅助: 梯形权 ---------- -def _trap_weights(f: np.ndarray): - if len(f)==1: return np.ones(1) - df = np.diff(f) - w = np.zeros_like(f) - w[0]=0.5*df[0]; w[-1]=0.5*df[-1] - if len(f)>2: - w[1:-1]=0.5*(df[:-1]+df[1:]) - return w - -# ---------- 简单测试占位 ---------- -if __name__ == "__main__": - # 虚构数据: 2 个参数样本 (Ns=2), 频率 200 点 - freqs = np.linspace(1e8, 5e9, 200) - # 真正模型 (示例): H = Σ_k R_k/(s - p_k) - true_poles = [-0.5e3 + 1.2e9j, -0.5e3 - 1.2e9j] - s = 1j*2*np.pi*freqs - def synth(Rs): - return Rs[0]/(s - true_poles[0]) + Rs[1]/(s - true_poles[1]) - H_list = [synth([0.8+0.1j, 1.2-0.2j]), synth([0.9+0.05j, 1.1+0.1j])] - params = np.array([[0.0],[1.0]]) # 1 维参数 - # 给一个冗余极点集合 (含真实 + 额外) - start_poles = generate_starting_poles(n_pairs=10, beta_min=5e8, beta_max=2.0e9, alpha_scale=0.000001) - res = opvf_from_H(H_list, freqs, params, total_degree=1, poles=start_poles) - print("C_ml shape:", res.C_ml.shape) - print("假极点(cancel):", res.spurious["cancel_spurious"]) - print("假极点(small):", res.spurious["small_spurious"]) \ No newline at end of file diff --git a/img_formula_70.png b/img_formula_70.png deleted file mode 100644 index 5b8eed4..0000000 Binary files a/img_formula_70.png and /dev/null differ diff --git a/main.py b/main.py index 5e1b3c3..4453f7c 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ from scipy.linalg import null_space from plotly.subplots import make_subplots import plotly.graph_objs from core.freqency import auto_select +from scipy.linalg import block_diag network = rf.Network("/tmp/paramer/simulation/3500/3500.s2p") diff --git a/param_vf.py b/param_vf.py deleted file mode 100644 index d1f97f7..0000000 --- a/param_vf.py +++ /dev/null @@ -1,583 +0,0 @@ -""" -Parametric (multivariate) orthonormal vector fitting for geometry + frequency. - -根据论文: Robust Parametric Macromodeling Using Multivariate Orthonormal Vector Fitting - -核心思想 (简化实现版本): -1. 在频域上使用一组全局极点 a_k, 通过所有参数样本共享。 -2. 对每个几何样本 s (由 (W,L) 给出) 与端口对 (i,j) 构建线性最小二乘问题, 固定极点估计该样本的残值 r_{k,s}, 以及常数/斜率项 d_s, e_s。 -3. 将随几何参数变化的残值/常数/斜率在一个离散正交 (QR) 的多元多项式基 Φ_q(W,L) 上展开: r_k(W,L) ≈ Σ_q c_{k,q} Φ_q(W,L)。 -4. 评价阶段: 先用 (W,L) 计算 Φ(W,L), 再恢复 r_k,d,e, 组合成有理函数 Σ_k r_k/(p-a_k)+d+p e。 - -该文件实现一个最小可用版本, 方便后续扩展(极点迭代 / 被动性约束 / 模型压缩等)。 -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import List, Tuple, Dict, Any, Optional -import numpy as np -from skrf import Network -import skrf as rf -from models.capa import Capa -from models.basic import ModelBasicDatasetUnit -import warnings - - -@dataclass -class ParametricVFConfig: - # 有理函数项数量 (K 个极点) - n_poles: int = 8 - # 参数多项式最大总次数 (a+b <= degree) - param_total_degree: int = 3 - # 频率单位: Hz (Network 中 f 默认 Hz) - damping: float = 0.02 # 极点实部相对于 2π f_max 的比例, 保证稳定 - # 最大保留频率点 (特征频率抽样); None 表示不裁剪 - max_freq_points: Optional[int] = None - # 频率选择方法: 'curvature' 或 'uniform' - freq_selection_method: str = 'curvature' - # 曲率阈值(可选), 仅 method='curvature' 时生效 - curvature_threshold: Optional[float] = None - # VF 极点迭代次数 (0 表示不迭代) - vf_iterations: int = 0 - # 极点迭代时是否打印警告 - verbose: bool = True - # 未来可添加: 加权, 正则, 被动性投影等 - - -def generate_monomial_exponents(dim: int, total_degree: int) -> List[Tuple[int, ...]]: - """生成所有满足指数和 <= total_degree 的多元单项式指数。 - 对于 2 维 (W,L) 返回 (a,b) 列表。 - """ - exps: List[Tuple[int, ...]] = [] - def rec(prefix: Tuple[int, ...], remaining_dim: int, max_sum: int): - if remaining_dim == 1: - for i in range(max_sum + 1): - exps.append(prefix + (i,)) - return - for i in range(max_sum + 1): - rec(prefix + (i,), remaining_dim - 1, max_sum - i) - rec(tuple(), dim, total_degree) - return exps - - -class OrthonormalParamBasis: - """离散点集上 (W,L) 的多项式基 QR 正交化封装。""" - def __init__(self, W: np.ndarray, L: np.ndarray, total_degree: int): - assert W.shape == L.shape - self.W_mean = W.mean() - self.W_std = W.std() or 1.0 - self.L_mean = L.mean() - self.L_std = L.std() or 1.0 - self.exps = generate_monomial_exponents(2, total_degree) - # 构建单项式矩阵 M (Ns x M) - M = [] - Wn = (W - self.W_mean) / self.W_std - Ln = (L - self.L_mean) / self.L_std - for a, b in self.exps: - M.append((Wn ** a) * (Ln ** b)) - M = np.stack(M, axis=1) # (Ns, M) - # QR 分解获得离散正交基 Q, R 上三角 - Q, R = np.linalg.qr(M) - self.Q = Q # (Ns, Q) - self.R = R # (Q, Q) - self.R_inv = np.linalg.inv(R) - - @property - def n_basis(self) -> int: - return self.Q.shape[1] - - def phi_matrix(self) -> np.ndarray: - return self.Q # 对应样本点的基函数值 - - def eval(self, W: float, L: float) -> np.ndarray: - Wn = (W - self.W_mean) / self.W_std - Ln = (L - self.L_mean) / self.L_std - monomials = [] - for a, b in self.exps: - monomials.append((Wn ** a) * (Ln ** b)) - m = np.asarray(monomials) # (M,) - # M = Q R => 对新点 m = phi R => phi = m R^{-1} - phi = m @ self.R_inv - return phi # (Q,) - - -class ParametricVectorFitting: - """多参数 (W,L) 二端口/多端口 S 参数的简化 Parametric VF。 - - 模型形式 (单个端口对): - H(p; μ) ≈ Σ_{k=1..K} r_k(μ)/(p - a_k) + d(μ) + p e(μ) - 其中 μ = (W,L)。r_k, d, e 在参数域被正交基展开。 - """ - def __init__(self, config: ParametricVFConfig): - self.cfg = config - self.frequencies: np.ndarray | None = None # (Nf,) - self.poles: np.ndarray | None = None # (K,) - self.basis: OrthonormalParamBasis | None = None - # 残值系数: (nports, nports, K, Q) - self.residue_coeffs: np.ndarray | None = None - # d,e 系数: (nports, nports, Q) - self.d_coeffs: np.ndarray | None = None - self.e_coeffs: np.ndarray | None = None - self.meta: Dict[str, Any] = {} - - def _init_global_poles(self, freqs: np.ndarray) -> np.ndarray: - f_min, f_max = freqs.min(), freqs.max() - # 使用对数均匀分布 (排除 0) + 负实部保证稳定 - f_samples = np.logspace(np.log10(max(f_min, freqs[1] if len(freqs) > 1 else f_min*1.1)), np.log10(f_max), self.cfg.n_poles) - # 极点: -σ + j 2π f_k - sigma = self.cfg.damping * 2 * np.pi * f_max - poles = -sigma + 1j * 2 * np.pi * f_samples - return poles - - # ----------------- 内部工具函数 (前置以便类型检查) ----------------- - def _build_frequency_matrix(self, p_vec: np.ndarray, poles: np.ndarray) -> np.ndarray: - K = len(poles) - F_base = np.zeros((len(p_vec), K + 2), dtype=complex) - for k, a_k in enumerate(poles): - F_base[:, k] = 1.0 / (p_vec - a_k) - F_base[:, K] = 1.0 - F_base[:, K + 1] = p_vec - return F_base - - def _solve_residues_given_poles(self, aligned_S: List[np.ndarray], p_vec: np.ndarray): - assert self.poles is not None, "Poles not initialized" - nports = aligned_S[0].shape[1] - Ns = len(aligned_S) - K = len(self.poles) - residues_samples = np.zeros((nports, nports, K, Ns), dtype=complex) - d_samples = np.zeros((nports, nports, Ns), dtype=complex) - e_samples = np.zeros((nports, nports, Ns), dtype=complex) - F_base = self._build_frequency_matrix(p_vec, self.poles) - for s_idx, Sdata in enumerate(aligned_S): - for i in range(nports): - for j in range(nports): - y = Sdata[:, i, j] - x, *_ = np.linalg.lstsq(F_base, y, rcond=None) - residues_samples[i, j, :, s_idx] = x[:K] - d_samples[i, j, s_idx] = x[K] - e_samples[i, j, s_idx] = x[K+1] - return residues_samples, d_samples, e_samples - - def _select_characteristic_freqs(self, freqs: np.ndarray, S_list: List[np.ndarray], max_points: int, - method: str = 'curvature', threshold: Optional[float] = None) -> np.ndarray: - Nf = len(freqs) - if max_points >= Nf: - return np.arange(Nf) - if method == 'uniform': - idx = np.linspace(0, Nf-1, max_points, dtype=int) - return np.unique(idx) - mags = [] - for S in S_list: - mags.append(np.mean(np.abs(S), axis=(1,2))) # (Nf,) - agg = np.mean(np.stack(mags, axis=1), axis=1) # (Nf,) - curv = np.zeros_like(agg) - curv[1:-1] = np.abs(agg[2:] - 2*agg[1:-1] + agg[:-2]) - base = {0, Nf-1} - if threshold is not None: - cand = np.where(curv >= threshold)[0] - else: - order = np.argsort(-curv) - cand = order - selected = list(base) - for idx in cand: - if idx in base: - continue - selected.append(idx) - if len(selected) >= max_points: - break - return np.array(sorted(selected)) - - def _vf_iterate_poles(self, aligned_S: List[np.ndarray], p_vec: np.ndarray, - residues_samples: np.ndarray, d_samples: np.ndarray, e_samples: np.ndarray) -> np.ndarray: - assert self.poles is not None, "Poles not initialized" - nports = residues_samples.shape[0] - K = len(self.poles) - Ns = len(aligned_S) - Nf = len(p_vec) - n_pairs = nports * nports - n_r = n_pairs * K - n_c = K - n_d = n_pairs - n_e = n_pairs - total_unknowns = n_r + n_c + n_d + n_e - rows = Ns * Nf * n_pairs - A = np.zeros((rows, total_unknowns), dtype=complex) - b = np.zeros(rows, dtype=complex) - def pair_index(i,j): - return i*nports + j - row = 0 - for s_idx, Sdata in enumerate(aligned_S): - for f_idx, p in enumerate(p_vec): - denom = p - self.poles - basis_freq = 1.0/denom - for i in range(nports): - for j in range(nports): - H = Sdata[f_idx, i, j] - b[row] = H - pair = pair_index(i,j) - r_offset = pair*K - A[row, r_offset:r_offset+K] = basis_freq - A[row, n_r:n_r+K] = -H * basis_freq - A[row, n_r + n_c + pair] = 1.0 - A[row, n_r + n_c + n_d + pair] = p - row += 1 - x, *_ = np.linalg.lstsq(A, b, rcond=None) - c_k = x[n_r:n_r+n_c] - poly_total = np.poly(self.poles) - acc = poly_total.copy() - for k in range(K): - others = [self.poles[m] for m in range(K) if m != k] - term = c_k[k] * np.poly(others) - acc = np.polyadd(acc, term) - new_poles = np.roots(acc) - return new_poles - - def fit(self, dataset: List[ModelBasicDatasetUnit], network_loader=Capa.network): - # 载入所有 Network - networks: List[Network] = [] - W_list, L_list = [], [] - for unit in dataset: - net = network_loader(unit.result_dir, unit.id) - networks.append(net) - W_list.append(unit.parameters["W"]) - L_list.append(unit.parameters["L"]) - assert len(networks) > 0, "Empty dataset" - # 处理不同频率点数量: 取公共重叠频段并插值到统一网格 - freqs_list = [n.f for n in networks] - f_start = max(f[0] for f in freqs_list) - f_stop = min(f[-1] for f in freqs_list) - if f_stop <= f_start: - raise ValueError("No overlapping frequency range among networks") - # 选择基准网格: 使用第一个网络在重叠区间内的频点, 若其它网络缺该点则插值 - base_freqs = networks[0].f - mask = (base_freqs >= f_start) & (base_freqs <= f_stop) - freqs = base_freqs[mask] - # 如果其它网络点数差异较大, 可选: 统一稠密度 (这里只在差异>5%时使用最小长度网格) - for f in freqs_list[1:]: - # 判断差异比例 - if abs(len(f) - len(freqs)) / max(len(freqs),1) > 0.2: - # 使用所有网络在重叠区间内最少点数, 构造等间距网格 - n_common = min(len(ff[(ff>=f_start)&(ff<=f_stop)]) for ff in freqs_list) - freqs = np.linspace(f_start, f_stop, n_common) - warnings.warn("Frequency grids differ significantly; using uniform linspace for interpolation.") - break - # 对每个网络插值到 freqs (实部/虚部分开线性插值) - aligned_S = [] # List[(Nf, nports, nports)] - for net in networks: - f_src = net.f - # 获取在目标范围内原始数据 (若后面改用插值网格不同于原点, 仍需完整插值) - S_src = net.s # (Nf_src, nports, nports) - nports = net.nports - if not np.array_equal(f_src, freqs): - # 线性插值 (对每个端口对实部与虚部) - S_interp = np.zeros((len(freqs), nports, nports), dtype=complex) - for i in range(nports): - for j in range(nports): - y = S_src[:, i, j] - # numpy.interp 需要单调递增; 为类型检查器显式转为 np.ndarray - y_arr = np.asarray(y) - y_real = np.real(y_arr) - y_imag = np.imag(y_arr) - real_part = np.interp(freqs, f_src, y_real) - imag_part = np.interp(freqs, f_src, y_imag) - S_interp[:, i, j] = real_part + 1j * imag_part - aligned_S.append(S_interp) - else: - aligned_S.append(S_src[mask] if len(S_src)==len(base_freqs) and mask.sum()!=len(base_freqs) else S_src) - self.frequencies = freqs - nports = networks[0].nports - Ns = len(networks) - # 参数正交基 - self.basis = OrthonormalParamBasis(np.array(W_list), np.array(L_list), self.cfg.param_total_degree) - Phi = self.basis.phi_matrix() # (Ns, Q) - Qn = Phi.shape[1] - # (可选) 特征频率子采样 - if self.cfg.max_freq_points is not None and len(freqs) > self.cfg.max_freq_points: - sel_idx = self._select_characteristic_freqs(freqs, aligned_S, self.cfg.max_freq_points, - method=self.cfg.freq_selection_method, - threshold=self.cfg.curvature_threshold) - freqs = freqs[sel_idx] - aligned_S = [S[sel_idx, :, :] for S in aligned_S] - if self.cfg.verbose: - warnings.warn(f"Frequency reduced to {len(freqs)} points (method={self.cfg.freq_selection_method}).") - # 初始化极点 - self.poles = self._init_global_poles(freqs) - p_vec = 1j * 2 * np.pi * freqs - # 首次求解残值 - residues_samples, d_samples, e_samples = self._solve_residues_given_poles(aligned_S, p_vec) - # 极点迭代 (简化共享极点 VF) - if self.cfg.vf_iterations > 0: - for it in range(self.cfg.vf_iterations): - new_poles = self._vf_iterate_poles(aligned_S, p_vec, residues_samples, d_samples, e_samples) - # 反射不稳定极点 - new_poles = np.array([p if p.real < 0 else complex(-p.real, p.imag) for p in new_poles]) - # 去重: 近似重复 (距离 < 1e-3 * |p|) 过滤 - filtered = [] - for p in new_poles: - if all(abs(p - q) > 1e-3*max(1.0, abs(p)) for q in filtered): - filtered.append(p) - if self.cfg.verbose: - print(f"[VF-Iter {it+1}] poles -> {len(filtered)} kept") - self.poles = np.array(filtered, dtype=complex) - p_vec = 1j * 2 * np.pi * freqs - residues_samples, d_samples, e_samples = self._solve_residues_given_poles(aligned_S, p_vec) - # 在参数基上再拟合: 由于 Phi 正交, 系数 = Phi^H * values - PhiH = Phi.conj().T # (Q, Ns) - K = self.poles.shape[0] - self.residue_coeffs = np.zeros((nports, nports, K, Qn), dtype=complex) - self.d_coeffs = np.zeros((nports, nports, Qn), dtype=complex) - self.e_coeffs = np.zeros((nports, nports, Qn), dtype=complex) - for i in range(nports): - for j in range(nports): - for k in range(K): - vk = residues_samples[i, j, k, :] # (Ns,) - self.residue_coeffs[i, j, k, :] = PhiH @ vk - self.d_coeffs[i, j, :] = PhiH @ d_samples[i, j, :] - self.e_coeffs[i, j, :] = PhiH @ e_samples[i, j, :] - # 简单误差评估 (训练点重建 RMS) - F_base = self._build_frequency_matrix(p_vec, self.poles) - train_rms = self._compute_training_rms(residues_samples, d_samples, e_samples, F_base, aligned_S) - self.meta['train_rms'] = train_rms - self.meta['nports'] = nports - self.meta['Ns'] = Ns - self.meta['Q'] = Qn - return self - - def _compute_training_rms(self, residues_samples, d_samples, e_samples, F_base, original_S_list): - """计算每个样本的平均端口对 RMS 误差。""" - nports = residues_samples.shape[0] - K = residues_samples.shape[2] - Ns = residues_samples.shape[3] - Nf = F_base.shape[0] - rms = [] - for s in range(Ns): - err_sq_sum = 0.0 - count = 0 - S_ref = original_S_list[s] - for i in range(nports): - for j in range(nports): - x = np.concatenate([ - residues_samples[i, j, :, s], - [d_samples[i, j, s], e_samples[i, j, s]] - ]) # (K+2,) - y_hat = F_base @ x # (Nf,) - y_ref = S_ref[:, i, j] - err = y_ref - y_hat - err_sq_sum += np.mean(np.abs(err)**2) - count += 1 - rms.append(np.sqrt(err_sq_sum / max(count,1))) - return rms - - def evaluate(self, W: float, L: float, freqs: np.ndarray | None = None) -> np.ndarray: - """返回插值后的 S 参数 (Nf, nports, nports)。""" - assert self.frequencies is not None and self.poles is not None - assert self.basis is not None - assert self.residue_coeffs is not None and self.d_coeffs is not None and self.e_coeffs is not None, "Model not fitted" - if freqs is None: - freqs = self.frequencies - # 若请求频率网格不同, 需要重建频率矩阵 - p_vec = 1j * 2 * np.pi * freqs - K = len(self.poles) - nports = self.residue_coeffs.shape[0] - phi = self.basis.eval(W, L) # (Q,) - # 预分配 - S_eval = np.zeros((len(freqs), nports, nports), dtype=complex) - denom_cache = np.zeros((len(freqs), K), dtype=complex) - for k, a_k in enumerate(self.poles): - denom_cache[:, k] = 1.0 / (p_vec - a_k) - for i in range(nports): - for j in range(nports): - # 恢复参数依赖的残值/常数/斜率 - r_k = (self.residue_coeffs[i, j, :, :] @ phi) # (K,) - d = self.d_coeffs[i, j, :] @ phi - e = self.e_coeffs[i, j, :] @ phi - Hf = denom_cache @ r_k + d + p_vec * e # (Nf,) - S_eval[:, i, j] = Hf - return S_eval - - def export_s2p(self, W: float, L: float, filepath: str, freqs: np.ndarray | None = None, z0: float | complex = 50) -> str: - """在给定几何参数 (W,L) 下计算 S 并写入 Touchstone (.sNp) 文件。 - - 参数: - W, L: 几何参数 - filepath: 目标文件完整路径, 可包含或不包含 .s2p/.sNp 后缀 - freqs: 可选自定义频率数组 (需在训练频率范围内); 默认使用训练频率 - z0: 端口参考阻抗 (标量或与端口数相同长度可扩展) - - 返回: - 实际写入的文件路径 - """ - assert self.frequencies is not None, "Model not fitted" - S_eval = self.evaluate(W, L, freqs) - if freqs is None: - freqs = self.frequencies - nports = S_eval.shape[1] - freqs_arr = np.asarray(freqs) - # 构建 Network 对象 - ntw = rf.Network(f=freqs_arr, s=S_eval, z0=z0) - # 解析输出路径 - import os - base_dir = os.path.dirname(filepath) or '.' - os.makedirs(base_dir, exist_ok=True) - base_name = os.path.basename(filepath) - # 去掉可能已有的扩展名, 由 skrf 自动加 .sNp - if '.' in base_name: - base_name = '.'.join(base_name.split('.')[:-1]) or base_name - # 写文件 - ntw.write_touchstone(base_name, dir=base_dir) - # 构造最终文件名 (.sNp) - final_path = os.path.join(base_dir, f"{base_name}.s{nports}p") - return final_path - - def to_dict(self) -> Dict[str, Any]: - return { - 'config': self.cfg.__dict__, - 'frequencies': self.frequencies.tolist() if self.frequencies is not None else None, - 'poles': self.poles.tolist() if self.poles is not None else None, - 'basis': { - 'W_mean': getattr(self.basis, 'W_mean', None), - 'W_std': getattr(self.basis, 'W_std', None), - 'L_mean': getattr(self.basis, 'L_mean', None), - 'L_std': getattr(self.basis, 'L_std', None), - 'exps': getattr(self.basis, 'exps', None), - 'R': (lambda _R: _R.tolist() if _R is not None else None)(getattr(self.basis, 'R', None)), - }, - 'residue_coeffs': self.residue_coeffs.tolist() if self.residue_coeffs is not None else None, - 'd_coeffs': self.d_coeffs.tolist() if self.d_coeffs is not None else None, - 'e_coeffs': self.e_coeffs.tolist() if self.e_coeffs is not None else None, - 'meta': self.meta, - } - - @staticmethod - def from_dict(data: Dict[str, Any]) -> 'ParametricVectorFitting': - cfg = ParametricVFConfig(**data['config']) - obj = ParametricVectorFitting(cfg) - obj.frequencies = np.array(data['frequencies']) if data['frequencies'] is not None else None - obj.poles = np.array(data['poles']) if data['poles'] is not None else None - # 重建 basis (仅用于 eval); 重新构造 R_inv - if data.get('basis') and data['basis']['R'] is not None: - basis_stub = OrthonormalParamBasis(np.array([0.0, 1.0]), np.array([0.0, 1.0]), 1) # 占位 - basis_stub.W_mean = data['basis']['W_mean'] - basis_stub.W_std = data['basis']['W_std'] - basis_stub.L_mean = data['basis']['L_mean'] - basis_stub.L_std = data['basis']['L_std'] - basis_stub.exps = [tuple(e) for e in data['basis']['exps']] - R = np.array(data['basis']['R']) - basis_stub.R = R - basis_stub.R_inv = np.linalg.inv(R) - # Q 不需要 (仅用于训练点), 设为 None - basis_stub.Q = None # type: ignore - obj.basis = basis_stub - if data.get('residue_coeffs') is not None: - obj.residue_coeffs = np.array(data['residue_coeffs']) - if data.get('d_coeffs') is not None: - obj.d_coeffs = np.array(data['d_coeffs']) - if data.get('e_coeffs') is not None: - obj.e_coeffs = np.array(data['e_coeffs']) - obj.meta = data.get('meta', {}) - return obj - -# 划分 -def split_dataset(dataset: List[ModelBasicDatasetUnit], train_ratio: float = 0.7, shuffle: bool = True, seed: int = 0) -> Tuple[List[ModelBasicDatasetUnit], List[ModelBasicDatasetUnit]]: - """划分数据集 70% 训练 / 30% 测试 (默认可改)。""" - ds = list(dataset) - if shuffle: - rng = np.random.default_rng(seed) - rng.shuffle(ds) - n_train = int(len(ds) * train_ratio) - return ds[:n_train], ds[n_train:] - - -def _interp_s_to_freqs(S: np.ndarray, f_src: np.ndarray, f_tgt: np.ndarray) -> np.ndarray: - """将 S(f_src) 线性插值到 f_tgt (复数分离实虚部)。""" - if np.array_equal(f_src, f_tgt): - return S - nports = S.shape[1] - S_new = np.zeros((len(f_tgt), nports, nports), dtype=complex) - for i in range(nports): - for j in range(nports): - y = S[:, i, j] - real_part = np.interp(f_tgt, f_src, np.real(y)) - imag_part = np.interp(f_tgt, f_src, np.imag(y)) - S_new[:, i, j] = real_part + 1j * imag_part - return S_new - - -def evaluate_dataset(model: ParametricVectorFitting, - subset: List[ModelBasicDatasetUnit], - network_loader=Capa.network) -> Dict[str, Any]: - """对一个子集评估拟合质量,输出每样本与整体统计.""" - assert model.frequencies is not None - freqs = model.frequencies - results = [] - mag_err_db_all = [] - rms_all = [] - for unit in subset: - net = network_loader(unit.result_dir, unit.id) - S_ref = _interp_s_to_freqs(net.s, net.f, freqs) # (Nf, nports, nports) - S_pred = model.evaluate(unit.parameters["W"], unit.parameters["L"], freqs) - err = S_pred - S_ref - rms = float(np.sqrt(np.mean(np.abs(err)**2))) - # 幅度 dB 误差 - mag_ref_db = 20*np.log10(np.clip(np.abs(S_ref), 1e-15, None)) - mag_pred_db = 20*np.log10(np.clip(np.abs(S_pred), 1e-15, None)) - mag_err_db = float(np.mean(np.abs(mag_pred_db - mag_ref_db))) - rms_all.append(rms) - mag_err_db_all.append(mag_err_db) - results.append({ - 'id': unit.id, - 'W': unit.parameters['W'], - 'L': unit.parameters['L'], - 'rms': rms, - 'mag_err_db': mag_err_db - }) - summary = { - 'count': len(subset), - 'rms_mean': float(np.mean(rms_all)) if rms_all else None, - 'rms_max': float(np.max(rms_all)) if rms_all else None, - 'mag_err_db_mean': float(np.mean(mag_err_db_all)) if mag_err_db_all else None, - 'mag_err_db_max': float(np.max(mag_err_db_all)) if mag_err_db_all else None, - 'details': results - } - return summary - - -# 简要使用示例 (非执行): -if __name__ == "__main__": - - capa = Capa() - capa.sweep() - print("sweep finished") - capa.export('capa_results.json') - capa.clear() - capa.load('capa_results.json') - dataset = capa.results - - # 70% 训练 / 30% 测试 - train_set, test_set = split_dataset(dataset, train_ratio=0.7, shuffle=True, seed=42) - print(f"Train: {len(train_set)} Test: {len(test_set)}") - - cfg = ParametricVFConfig(n_poles=10, param_total_degree=3, vf_iterations=2, - max_freq_points=50) - pvf = ParametricVectorFitting(cfg).fit(train_set) - - train_metrics = evaluate_dataset(pvf, train_set) - test_metrics = evaluate_dataset(pvf, test_set) - - pvf.meta['train_eval'] = train_metrics - pvf.meta['test_eval'] = test_metrics - - def _print_metrics(title: str, m: Dict[str, Any]): - print(f"[{title}] samples={m['count']}" - f" RMS_mean={m['rms_mean']:.4e} RMS_max={m['rms_max']:.4e}" - f" |Δ|dB_mean={m['mag_err_db_mean']:.3f}dB |Δ|dB_max={m['mag_err_db_max']:.3f}dB") - - _print_metrics("TRAIN", train_metrics) - _print_metrics("TEST", test_metrics) - - # 示例:导出一个测试点的拟合结果为 s2p - if test_set: - sample = test_set[0] - out_path = pvf.export_s2p(sample.parameters["W"], sample.parameters["L"], filepath='outputs/pvf_test_sample') - print("Exported:", out_path) \ No newline at end of file