583 lines
26 KiB
Python
583 lines
26 KiB
Python
|
|
"""
|
|||
|
|
Parametric (multivariate) orthonormal vector fitting for geometry + frequency.
|
|||
|
|
|
|||
|
|
根据论文: Robust Parametric Macromodeling Using Multivariate Orthonormal Vector Fitting
|
|||
|
|
|
|||
|
|
核心思想 (简化实现版本):
|
|||
|
|
1. 在频域上使用一组全局极点 a_k, 通过所有参数样本共享。
|
|||
|
|
2. 对每个几何样本 s (由 (W,L) 给出) 与端口对 (i,j) 构建线性最小二乘问题, 固定极点估计该样本的残值 r_{k,s}, 以及常数/斜率项 d_s, e_s。
|
|||
|
|
3. 将随几何参数变化的残值/常数/斜率在一个离散正交 (QR) 的多元多项式基 Φ_q(W,L) 上展开: r_k(W,L) ≈ Σ_q c_{k,q} Φ_q(W,L)。
|
|||
|
|
4. 评价阶段: 先用 (W,L) 计算 Φ(W,L), 再恢复 r_k,d,e, 组合成有理函数 Σ_k r_k/(p-a_k)+d+p e。
|
|||
|
|
|
|||
|
|
该文件实现一个最小可用版本, 方便后续扩展(极点迭代 / 被动性约束 / 模型压缩等)。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from typing import List, Tuple, Dict, Any, Optional
|
|||
|
|
import numpy as np
|
|||
|
|
from skrf import Network
|
|||
|
|
import skrf as rf
|
|||
|
|
from models.capa import Capa
|
|||
|
|
from models.basic import ModelBasicDatasetUnit
|
|||
|
|
import warnings
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ParametricVFConfig:
|
|||
|
|
# 有理函数项数量 (K 个极点)
|
|||
|
|
n_poles: int = 8
|
|||
|
|
# 参数多项式最大总次数 (a+b <= degree)
|
|||
|
|
param_total_degree: int = 3
|
|||
|
|
# 频率单位: Hz (Network 中 f 默认 Hz)
|
|||
|
|
damping: float = 0.02 # 极点实部相对于 2π f_max 的比例, 保证稳定
|
|||
|
|
# 最大保留频率点 (特征频率抽样); None 表示不裁剪
|
|||
|
|
max_freq_points: Optional[int] = None
|
|||
|
|
# 频率选择方法: 'curvature' 或 'uniform'
|
|||
|
|
freq_selection_method: str = 'curvature'
|
|||
|
|
# 曲率阈值(可选), 仅 method='curvature' 时生效
|
|||
|
|
curvature_threshold: Optional[float] = None
|
|||
|
|
# VF 极点迭代次数 (0 表示不迭代)
|
|||
|
|
vf_iterations: int = 0
|
|||
|
|
# 极点迭代时是否打印警告
|
|||
|
|
verbose: bool = True
|
|||
|
|
# 未来可添加: 加权, 正则, 被动性投影等
|
|||
|
|
|
|||
|
|
|
|||
|
|
def generate_monomial_exponents(dim: int, total_degree: int) -> List[Tuple[int, ...]]:
|
|||
|
|
"""生成所有满足指数和 <= total_degree 的多元单项式指数。
|
|||
|
|
对于 2 维 (W,L) 返回 (a,b) 列表。
|
|||
|
|
"""
|
|||
|
|
exps: List[Tuple[int, ...]] = []
|
|||
|
|
def rec(prefix: Tuple[int, ...], remaining_dim: int, max_sum: int):
|
|||
|
|
if remaining_dim == 1:
|
|||
|
|
for i in range(max_sum + 1):
|
|||
|
|
exps.append(prefix + (i,))
|
|||
|
|
return
|
|||
|
|
for i in range(max_sum + 1):
|
|||
|
|
rec(prefix + (i,), remaining_dim - 1, max_sum - i)
|
|||
|
|
rec(tuple(), dim, total_degree)
|
|||
|
|
return exps
|
|||
|
|
|
|||
|
|
|
|||
|
|
class OrthonormalParamBasis:
|
|||
|
|
"""离散点集上 (W,L) 的多项式基 QR 正交化封装。"""
|
|||
|
|
def __init__(self, W: np.ndarray, L: np.ndarray, total_degree: int):
|
|||
|
|
assert W.shape == L.shape
|
|||
|
|
self.W_mean = W.mean()
|
|||
|
|
self.W_std = W.std() or 1.0
|
|||
|
|
self.L_mean = L.mean()
|
|||
|
|
self.L_std = L.std() or 1.0
|
|||
|
|
self.exps = generate_monomial_exponents(2, total_degree)
|
|||
|
|
# 构建单项式矩阵 M (Ns x M)
|
|||
|
|
M = []
|
|||
|
|
Wn = (W - self.W_mean) / self.W_std
|
|||
|
|
Ln = (L - self.L_mean) / self.L_std
|
|||
|
|
for a, b in self.exps:
|
|||
|
|
M.append((Wn ** a) * (Ln ** b))
|
|||
|
|
M = np.stack(M, axis=1) # (Ns, M)
|
|||
|
|
# QR 分解获得离散正交基 Q, R 上三角
|
|||
|
|
Q, R = np.linalg.qr(M)
|
|||
|
|
self.Q = Q # (Ns, Q)
|
|||
|
|
self.R = R # (Q, Q)
|
|||
|
|
self.R_inv = np.linalg.inv(R)
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def n_basis(self) -> int:
|
|||
|
|
return self.Q.shape[1]
|
|||
|
|
|
|||
|
|
def phi_matrix(self) -> np.ndarray:
|
|||
|
|
return self.Q # 对应样本点的基函数值
|
|||
|
|
|
|||
|
|
def eval(self, W: float, L: float) -> np.ndarray:
|
|||
|
|
Wn = (W - self.W_mean) / self.W_std
|
|||
|
|
Ln = (L - self.L_mean) / self.L_std
|
|||
|
|
monomials = []
|
|||
|
|
for a, b in self.exps:
|
|||
|
|
monomials.append((Wn ** a) * (Ln ** b))
|
|||
|
|
m = np.asarray(monomials) # (M,)
|
|||
|
|
# M = Q R => 对新点 m = phi R => phi = m R^{-1}
|
|||
|
|
phi = m @ self.R_inv
|
|||
|
|
return phi # (Q,)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ParametricVectorFitting:
|
|||
|
|
"""多参数 (W,L) 二端口/多端口 S 参数的简化 Parametric VF。
|
|||
|
|
|
|||
|
|
模型形式 (单个端口对):
|
|||
|
|
H(p; μ) ≈ Σ_{k=1..K} r_k(μ)/(p - a_k) + d(μ) + p e(μ)
|
|||
|
|
其中 μ = (W,L)。r_k, d, e 在参数域被正交基展开。
|
|||
|
|
"""
|
|||
|
|
def __init__(self, config: ParametricVFConfig):
|
|||
|
|
self.cfg = config
|
|||
|
|
self.frequencies: np.ndarray | None = None # (Nf,)
|
|||
|
|
self.poles: np.ndarray | None = None # (K,)
|
|||
|
|
self.basis: OrthonormalParamBasis | None = None
|
|||
|
|
# 残值系数: (nports, nports, K, Q)
|
|||
|
|
self.residue_coeffs: np.ndarray | None = None
|
|||
|
|
# d,e 系数: (nports, nports, Q)
|
|||
|
|
self.d_coeffs: np.ndarray | None = None
|
|||
|
|
self.e_coeffs: np.ndarray | None = None
|
|||
|
|
self.meta: Dict[str, Any] = {}
|
|||
|
|
|
|||
|
|
def _init_global_poles(self, freqs: np.ndarray) -> np.ndarray:
|
|||
|
|
f_min, f_max = freqs.min(), freqs.max()
|
|||
|
|
# 使用对数均匀分布 (排除 0) + 负实部保证稳定
|
|||
|
|
f_samples = np.logspace(np.log10(max(f_min, freqs[1] if len(freqs) > 1 else f_min*1.1)), np.log10(f_max), self.cfg.n_poles)
|
|||
|
|
# 极点: -σ + j 2π f_k
|
|||
|
|
sigma = self.cfg.damping * 2 * np.pi * f_max
|
|||
|
|
poles = -sigma + 1j * 2 * np.pi * f_samples
|
|||
|
|
return poles
|
|||
|
|
|
|||
|
|
# ----------------- 内部工具函数 (前置以便类型检查) -----------------
|
|||
|
|
def _build_frequency_matrix(self, p_vec: np.ndarray, poles: np.ndarray) -> np.ndarray:
|
|||
|
|
K = len(poles)
|
|||
|
|
F_base = np.zeros((len(p_vec), K + 2), dtype=complex)
|
|||
|
|
for k, a_k in enumerate(poles):
|
|||
|
|
F_base[:, k] = 1.0 / (p_vec - a_k)
|
|||
|
|
F_base[:, K] = 1.0
|
|||
|
|
F_base[:, K + 1] = p_vec
|
|||
|
|
return F_base
|
|||
|
|
|
|||
|
|
def _solve_residues_given_poles(self, aligned_S: List[np.ndarray], p_vec: np.ndarray):
|
|||
|
|
assert self.poles is not None, "Poles not initialized"
|
|||
|
|
nports = aligned_S[0].shape[1]
|
|||
|
|
Ns = len(aligned_S)
|
|||
|
|
K = len(self.poles)
|
|||
|
|
residues_samples = np.zeros((nports, nports, K, Ns), dtype=complex)
|
|||
|
|
d_samples = np.zeros((nports, nports, Ns), dtype=complex)
|
|||
|
|
e_samples = np.zeros((nports, nports, Ns), dtype=complex)
|
|||
|
|
F_base = self._build_frequency_matrix(p_vec, self.poles)
|
|||
|
|
for s_idx, Sdata in enumerate(aligned_S):
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
y = Sdata[:, i, j]
|
|||
|
|
x, *_ = np.linalg.lstsq(F_base, y, rcond=None)
|
|||
|
|
residues_samples[i, j, :, s_idx] = x[:K]
|
|||
|
|
d_samples[i, j, s_idx] = x[K]
|
|||
|
|
e_samples[i, j, s_idx] = x[K+1]
|
|||
|
|
return residues_samples, d_samples, e_samples
|
|||
|
|
|
|||
|
|
def _select_characteristic_freqs(self, freqs: np.ndarray, S_list: List[np.ndarray], max_points: int,
|
|||
|
|
method: str = 'curvature', threshold: Optional[float] = None) -> np.ndarray:
|
|||
|
|
Nf = len(freqs)
|
|||
|
|
if max_points >= Nf:
|
|||
|
|
return np.arange(Nf)
|
|||
|
|
if method == 'uniform':
|
|||
|
|
idx = np.linspace(0, Nf-1, max_points, dtype=int)
|
|||
|
|
return np.unique(idx)
|
|||
|
|
mags = []
|
|||
|
|
for S in S_list:
|
|||
|
|
mags.append(np.mean(np.abs(S), axis=(1,2))) # (Nf,)
|
|||
|
|
agg = np.mean(np.stack(mags, axis=1), axis=1) # (Nf,)
|
|||
|
|
curv = np.zeros_like(agg)
|
|||
|
|
curv[1:-1] = np.abs(agg[2:] - 2*agg[1:-1] + agg[:-2])
|
|||
|
|
base = {0, Nf-1}
|
|||
|
|
if threshold is not None:
|
|||
|
|
cand = np.where(curv >= threshold)[0]
|
|||
|
|
else:
|
|||
|
|
order = np.argsort(-curv)
|
|||
|
|
cand = order
|
|||
|
|
selected = list(base)
|
|||
|
|
for idx in cand:
|
|||
|
|
if idx in base:
|
|||
|
|
continue
|
|||
|
|
selected.append(idx)
|
|||
|
|
if len(selected) >= max_points:
|
|||
|
|
break
|
|||
|
|
return np.array(sorted(selected))
|
|||
|
|
|
|||
|
|
def _vf_iterate_poles(self, aligned_S: List[np.ndarray], p_vec: np.ndarray,
|
|||
|
|
residues_samples: np.ndarray, d_samples: np.ndarray, e_samples: np.ndarray) -> np.ndarray:
|
|||
|
|
assert self.poles is not None, "Poles not initialized"
|
|||
|
|
nports = residues_samples.shape[0]
|
|||
|
|
K = len(self.poles)
|
|||
|
|
Ns = len(aligned_S)
|
|||
|
|
Nf = len(p_vec)
|
|||
|
|
n_pairs = nports * nports
|
|||
|
|
n_r = n_pairs * K
|
|||
|
|
n_c = K
|
|||
|
|
n_d = n_pairs
|
|||
|
|
n_e = n_pairs
|
|||
|
|
total_unknowns = n_r + n_c + n_d + n_e
|
|||
|
|
rows = Ns * Nf * n_pairs
|
|||
|
|
A = np.zeros((rows, total_unknowns), dtype=complex)
|
|||
|
|
b = np.zeros(rows, dtype=complex)
|
|||
|
|
def pair_index(i,j):
|
|||
|
|
return i*nports + j
|
|||
|
|
row = 0
|
|||
|
|
for s_idx, Sdata in enumerate(aligned_S):
|
|||
|
|
for f_idx, p in enumerate(p_vec):
|
|||
|
|
denom = p - self.poles
|
|||
|
|
basis_freq = 1.0/denom
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
H = Sdata[f_idx, i, j]
|
|||
|
|
b[row] = H
|
|||
|
|
pair = pair_index(i,j)
|
|||
|
|
r_offset = pair*K
|
|||
|
|
A[row, r_offset:r_offset+K] = basis_freq
|
|||
|
|
A[row, n_r:n_r+K] = -H * basis_freq
|
|||
|
|
A[row, n_r + n_c + pair] = 1.0
|
|||
|
|
A[row, n_r + n_c + n_d + pair] = p
|
|||
|
|
row += 1
|
|||
|
|
x, *_ = np.linalg.lstsq(A, b, rcond=None)
|
|||
|
|
c_k = x[n_r:n_r+n_c]
|
|||
|
|
poly_total = np.poly(self.poles)
|
|||
|
|
acc = poly_total.copy()
|
|||
|
|
for k in range(K):
|
|||
|
|
others = [self.poles[m] for m in range(K) if m != k]
|
|||
|
|
term = c_k[k] * np.poly(others)
|
|||
|
|
acc = np.polyadd(acc, term)
|
|||
|
|
new_poles = np.roots(acc)
|
|||
|
|
return new_poles
|
|||
|
|
|
|||
|
|
def fit(self, dataset: List[ModelBasicDatasetUnit], network_loader=Capa.network):
|
|||
|
|
# 载入所有 Network
|
|||
|
|
networks: List[Network] = []
|
|||
|
|
W_list, L_list = [], []
|
|||
|
|
for unit in dataset:
|
|||
|
|
net = network_loader(unit.result_dir, unit.id)
|
|||
|
|
networks.append(net)
|
|||
|
|
W_list.append(unit.parameters["W"])
|
|||
|
|
L_list.append(unit.parameters["L"])
|
|||
|
|
assert len(networks) > 0, "Empty dataset"
|
|||
|
|
# 处理不同频率点数量: 取公共重叠频段并插值到统一网格
|
|||
|
|
freqs_list = [n.f for n in networks]
|
|||
|
|
f_start = max(f[0] for f in freqs_list)
|
|||
|
|
f_stop = min(f[-1] for f in freqs_list)
|
|||
|
|
if f_stop <= f_start:
|
|||
|
|
raise ValueError("No overlapping frequency range among networks")
|
|||
|
|
# 选择基准网格: 使用第一个网络在重叠区间内的频点, 若其它网络缺该点则插值
|
|||
|
|
base_freqs = networks[0].f
|
|||
|
|
mask = (base_freqs >= f_start) & (base_freqs <= f_stop)
|
|||
|
|
freqs = base_freqs[mask]
|
|||
|
|
# 如果其它网络点数差异较大, 可选: 统一稠密度 (这里只在差异>5%时使用最小长度网格)
|
|||
|
|
for f in freqs_list[1:]:
|
|||
|
|
# 判断差异比例
|
|||
|
|
if abs(len(f) - len(freqs)) / max(len(freqs),1) > 0.2:
|
|||
|
|
# 使用所有网络在重叠区间内最少点数, 构造等间距网格
|
|||
|
|
n_common = min(len(ff[(ff>=f_start)&(ff<=f_stop)]) for ff in freqs_list)
|
|||
|
|
freqs = np.linspace(f_start, f_stop, n_common)
|
|||
|
|
warnings.warn("Frequency grids differ significantly; using uniform linspace for interpolation.")
|
|||
|
|
break
|
|||
|
|
# 对每个网络插值到 freqs (实部/虚部分开线性插值)
|
|||
|
|
aligned_S = [] # List[(Nf, nports, nports)]
|
|||
|
|
for net in networks:
|
|||
|
|
f_src = net.f
|
|||
|
|
# 获取在目标范围内原始数据 (若后面改用插值网格不同于原点, 仍需完整插值)
|
|||
|
|
S_src = net.s # (Nf_src, nports, nports)
|
|||
|
|
nports = net.nports
|
|||
|
|
if not np.array_equal(f_src, freqs):
|
|||
|
|
# 线性插值 (对每个端口对实部与虚部)
|
|||
|
|
S_interp = np.zeros((len(freqs), nports, nports), dtype=complex)
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
y = S_src[:, i, j]
|
|||
|
|
# numpy.interp 需要单调递增; 为类型检查器显式转为 np.ndarray
|
|||
|
|
y_arr = np.asarray(y)
|
|||
|
|
y_real = np.real(y_arr)
|
|||
|
|
y_imag = np.imag(y_arr)
|
|||
|
|
real_part = np.interp(freqs, f_src, y_real)
|
|||
|
|
imag_part = np.interp(freqs, f_src, y_imag)
|
|||
|
|
S_interp[:, i, j] = real_part + 1j * imag_part
|
|||
|
|
aligned_S.append(S_interp)
|
|||
|
|
else:
|
|||
|
|
aligned_S.append(S_src[mask] if len(S_src)==len(base_freqs) and mask.sum()!=len(base_freqs) else S_src)
|
|||
|
|
self.frequencies = freqs
|
|||
|
|
nports = networks[0].nports
|
|||
|
|
Ns = len(networks)
|
|||
|
|
# 参数正交基
|
|||
|
|
self.basis = OrthonormalParamBasis(np.array(W_list), np.array(L_list), self.cfg.param_total_degree)
|
|||
|
|
Phi = self.basis.phi_matrix() # (Ns, Q)
|
|||
|
|
Qn = Phi.shape[1]
|
|||
|
|
# (可选) 特征频率子采样
|
|||
|
|
if self.cfg.max_freq_points is not None and len(freqs) > self.cfg.max_freq_points:
|
|||
|
|
sel_idx = self._select_characteristic_freqs(freqs, aligned_S, self.cfg.max_freq_points,
|
|||
|
|
method=self.cfg.freq_selection_method,
|
|||
|
|
threshold=self.cfg.curvature_threshold)
|
|||
|
|
freqs = freqs[sel_idx]
|
|||
|
|
aligned_S = [S[sel_idx, :, :] for S in aligned_S]
|
|||
|
|
if self.cfg.verbose:
|
|||
|
|
warnings.warn(f"Frequency reduced to {len(freqs)} points (method={self.cfg.freq_selection_method}).")
|
|||
|
|
# 初始化极点
|
|||
|
|
self.poles = self._init_global_poles(freqs)
|
|||
|
|
p_vec = 1j * 2 * np.pi * freqs
|
|||
|
|
# 首次求解残值
|
|||
|
|
residues_samples, d_samples, e_samples = self._solve_residues_given_poles(aligned_S, p_vec)
|
|||
|
|
# 极点迭代 (简化共享极点 VF)
|
|||
|
|
if self.cfg.vf_iterations > 0:
|
|||
|
|
for it in range(self.cfg.vf_iterations):
|
|||
|
|
new_poles = self._vf_iterate_poles(aligned_S, p_vec, residues_samples, d_samples, e_samples)
|
|||
|
|
# 反射不稳定极点
|
|||
|
|
new_poles = np.array([p if p.real < 0 else complex(-p.real, p.imag) for p in new_poles])
|
|||
|
|
# 去重: 近似重复 (距离 < 1e-3 * |p|) 过滤
|
|||
|
|
filtered = []
|
|||
|
|
for p in new_poles:
|
|||
|
|
if all(abs(p - q) > 1e-3*max(1.0, abs(p)) for q in filtered):
|
|||
|
|
filtered.append(p)
|
|||
|
|
if self.cfg.verbose:
|
|||
|
|
print(f"[VF-Iter {it+1}] poles -> {len(filtered)} kept")
|
|||
|
|
self.poles = np.array(filtered, dtype=complex)
|
|||
|
|
p_vec = 1j * 2 * np.pi * freqs
|
|||
|
|
residues_samples, d_samples, e_samples = self._solve_residues_given_poles(aligned_S, p_vec)
|
|||
|
|
# 在参数基上再拟合: 由于 Phi 正交, 系数 = Phi^H * values
|
|||
|
|
PhiH = Phi.conj().T # (Q, Ns)
|
|||
|
|
K = self.poles.shape[0]
|
|||
|
|
self.residue_coeffs = np.zeros((nports, nports, K, Qn), dtype=complex)
|
|||
|
|
self.d_coeffs = np.zeros((nports, nports, Qn), dtype=complex)
|
|||
|
|
self.e_coeffs = np.zeros((nports, nports, Qn), dtype=complex)
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
for k in range(K):
|
|||
|
|
vk = residues_samples[i, j, k, :] # (Ns,)
|
|||
|
|
self.residue_coeffs[i, j, k, :] = PhiH @ vk
|
|||
|
|
self.d_coeffs[i, j, :] = PhiH @ d_samples[i, j, :]
|
|||
|
|
self.e_coeffs[i, j, :] = PhiH @ e_samples[i, j, :]
|
|||
|
|
# 简单误差评估 (训练点重建 RMS)
|
|||
|
|
F_base = self._build_frequency_matrix(p_vec, self.poles)
|
|||
|
|
train_rms = self._compute_training_rms(residues_samples, d_samples, e_samples, F_base, aligned_S)
|
|||
|
|
self.meta['train_rms'] = train_rms
|
|||
|
|
self.meta['nports'] = nports
|
|||
|
|
self.meta['Ns'] = Ns
|
|||
|
|
self.meta['Q'] = Qn
|
|||
|
|
return self
|
|||
|
|
|
|||
|
|
def _compute_training_rms(self, residues_samples, d_samples, e_samples, F_base, original_S_list):
|
|||
|
|
"""计算每个样本的平均端口对 RMS 误差。"""
|
|||
|
|
nports = residues_samples.shape[0]
|
|||
|
|
K = residues_samples.shape[2]
|
|||
|
|
Ns = residues_samples.shape[3]
|
|||
|
|
Nf = F_base.shape[0]
|
|||
|
|
rms = []
|
|||
|
|
for s in range(Ns):
|
|||
|
|
err_sq_sum = 0.0
|
|||
|
|
count = 0
|
|||
|
|
S_ref = original_S_list[s]
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
x = np.concatenate([
|
|||
|
|
residues_samples[i, j, :, s],
|
|||
|
|
[d_samples[i, j, s], e_samples[i, j, s]]
|
|||
|
|
]) # (K+2,)
|
|||
|
|
y_hat = F_base @ x # (Nf,)
|
|||
|
|
y_ref = S_ref[:, i, j]
|
|||
|
|
err = y_ref - y_hat
|
|||
|
|
err_sq_sum += np.mean(np.abs(err)**2)
|
|||
|
|
count += 1
|
|||
|
|
rms.append(np.sqrt(err_sq_sum / max(count,1)))
|
|||
|
|
return rms
|
|||
|
|
|
|||
|
|
def evaluate(self, W: float, L: float, freqs: np.ndarray | None = None) -> np.ndarray:
|
|||
|
|
"""返回插值后的 S 参数 (Nf, nports, nports)。"""
|
|||
|
|
assert self.frequencies is not None and self.poles is not None
|
|||
|
|
assert self.basis is not None
|
|||
|
|
assert self.residue_coeffs is not None and self.d_coeffs is not None and self.e_coeffs is not None, "Model not fitted"
|
|||
|
|
if freqs is None:
|
|||
|
|
freqs = self.frequencies
|
|||
|
|
# 若请求频率网格不同, 需要重建频率矩阵
|
|||
|
|
p_vec = 1j * 2 * np.pi * freqs
|
|||
|
|
K = len(self.poles)
|
|||
|
|
nports = self.residue_coeffs.shape[0]
|
|||
|
|
phi = self.basis.eval(W, L) # (Q,)
|
|||
|
|
# 预分配
|
|||
|
|
S_eval = np.zeros((len(freqs), nports, nports), dtype=complex)
|
|||
|
|
denom_cache = np.zeros((len(freqs), K), dtype=complex)
|
|||
|
|
for k, a_k in enumerate(self.poles):
|
|||
|
|
denom_cache[:, k] = 1.0 / (p_vec - a_k)
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
# 恢复参数依赖的残值/常数/斜率
|
|||
|
|
r_k = (self.residue_coeffs[i, j, :, :] @ phi) # (K,)
|
|||
|
|
d = self.d_coeffs[i, j, :] @ phi
|
|||
|
|
e = self.e_coeffs[i, j, :] @ phi
|
|||
|
|
Hf = denom_cache @ r_k + d + p_vec * e # (Nf,)
|
|||
|
|
S_eval[:, i, j] = Hf
|
|||
|
|
return S_eval
|
|||
|
|
|
|||
|
|
def export_s2p(self, W: float, L: float, filepath: str, freqs: np.ndarray | None = None, z0: float | complex = 50) -> str:
|
|||
|
|
"""在给定几何参数 (W,L) 下计算 S 并写入 Touchstone (.sNp) 文件。
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
W, L: 几何参数
|
|||
|
|
filepath: 目标文件完整路径, 可包含或不包含 .s2p/.sNp 后缀
|
|||
|
|
freqs: 可选自定义频率数组 (需在训练频率范围内); 默认使用训练频率
|
|||
|
|
z0: 端口参考阻抗 (标量或与端口数相同长度可扩展)
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
实际写入的文件路径
|
|||
|
|
"""
|
|||
|
|
assert self.frequencies is not None, "Model not fitted"
|
|||
|
|
S_eval = self.evaluate(W, L, freqs)
|
|||
|
|
if freqs is None:
|
|||
|
|
freqs = self.frequencies
|
|||
|
|
nports = S_eval.shape[1]
|
|||
|
|
freqs_arr = np.asarray(freqs)
|
|||
|
|
# 构建 Network 对象
|
|||
|
|
ntw = rf.Network(f=freqs_arr, s=S_eval, z0=z0)
|
|||
|
|
# 解析输出路径
|
|||
|
|
import os
|
|||
|
|
base_dir = os.path.dirname(filepath) or '.'
|
|||
|
|
os.makedirs(base_dir, exist_ok=True)
|
|||
|
|
base_name = os.path.basename(filepath)
|
|||
|
|
# 去掉可能已有的扩展名, 由 skrf 自动加 .sNp
|
|||
|
|
if '.' in base_name:
|
|||
|
|
base_name = '.'.join(base_name.split('.')[:-1]) or base_name
|
|||
|
|
# 写文件
|
|||
|
|
ntw.write_touchstone(base_name, dir=base_dir)
|
|||
|
|
# 构造最终文件名 (.sNp)
|
|||
|
|
final_path = os.path.join(base_dir, f"{base_name}.s{nports}p")
|
|||
|
|
return final_path
|
|||
|
|
|
|||
|
|
def to_dict(self) -> Dict[str, Any]:
|
|||
|
|
return {
|
|||
|
|
'config': self.cfg.__dict__,
|
|||
|
|
'frequencies': self.frequencies.tolist() if self.frequencies is not None else None,
|
|||
|
|
'poles': self.poles.tolist() if self.poles is not None else None,
|
|||
|
|
'basis': {
|
|||
|
|
'W_mean': getattr(self.basis, 'W_mean', None),
|
|||
|
|
'W_std': getattr(self.basis, 'W_std', None),
|
|||
|
|
'L_mean': getattr(self.basis, 'L_mean', None),
|
|||
|
|
'L_std': getattr(self.basis, 'L_std', None),
|
|||
|
|
'exps': getattr(self.basis, 'exps', None),
|
|||
|
|
'R': (lambda _R: _R.tolist() if _R is not None else None)(getattr(self.basis, 'R', None)),
|
|||
|
|
},
|
|||
|
|
'residue_coeffs': self.residue_coeffs.tolist() if self.residue_coeffs is not None else None,
|
|||
|
|
'd_coeffs': self.d_coeffs.tolist() if self.d_coeffs is not None else None,
|
|||
|
|
'e_coeffs': self.e_coeffs.tolist() if self.e_coeffs is not None else None,
|
|||
|
|
'meta': self.meta,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def from_dict(data: Dict[str, Any]) -> 'ParametricVectorFitting':
|
|||
|
|
cfg = ParametricVFConfig(**data['config'])
|
|||
|
|
obj = ParametricVectorFitting(cfg)
|
|||
|
|
obj.frequencies = np.array(data['frequencies']) if data['frequencies'] is not None else None
|
|||
|
|
obj.poles = np.array(data['poles']) if data['poles'] is not None else None
|
|||
|
|
# 重建 basis (仅用于 eval); 重新构造 R_inv
|
|||
|
|
if data.get('basis') and data['basis']['R'] is not None:
|
|||
|
|
basis_stub = OrthonormalParamBasis(np.array([0.0, 1.0]), np.array([0.0, 1.0]), 1) # 占位
|
|||
|
|
basis_stub.W_mean = data['basis']['W_mean']
|
|||
|
|
basis_stub.W_std = data['basis']['W_std']
|
|||
|
|
basis_stub.L_mean = data['basis']['L_mean']
|
|||
|
|
basis_stub.L_std = data['basis']['L_std']
|
|||
|
|
basis_stub.exps = [tuple(e) for e in data['basis']['exps']]
|
|||
|
|
R = np.array(data['basis']['R'])
|
|||
|
|
basis_stub.R = R
|
|||
|
|
basis_stub.R_inv = np.linalg.inv(R)
|
|||
|
|
# Q 不需要 (仅用于训练点), 设为 None
|
|||
|
|
basis_stub.Q = None # type: ignore
|
|||
|
|
obj.basis = basis_stub
|
|||
|
|
if data.get('residue_coeffs') is not None:
|
|||
|
|
obj.residue_coeffs = np.array(data['residue_coeffs'])
|
|||
|
|
if data.get('d_coeffs') is not None:
|
|||
|
|
obj.d_coeffs = np.array(data['d_coeffs'])
|
|||
|
|
if data.get('e_coeffs') is not None:
|
|||
|
|
obj.e_coeffs = np.array(data['e_coeffs'])
|
|||
|
|
obj.meta = data.get('meta', {})
|
|||
|
|
return obj
|
|||
|
|
|
|||
|
|
# 划分
|
|||
|
|
def split_dataset(dataset: List[ModelBasicDatasetUnit], train_ratio: float = 0.7, shuffle: bool = True, seed: int = 0) -> Tuple[List[ModelBasicDatasetUnit], List[ModelBasicDatasetUnit]]:
|
|||
|
|
"""划分数据集 70% 训练 / 30% 测试 (默认可改)。"""
|
|||
|
|
ds = list(dataset)
|
|||
|
|
if shuffle:
|
|||
|
|
rng = np.random.default_rng(seed)
|
|||
|
|
rng.shuffle(ds)
|
|||
|
|
n_train = int(len(ds) * train_ratio)
|
|||
|
|
return ds[:n_train], ds[n_train:]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _interp_s_to_freqs(S: np.ndarray, f_src: np.ndarray, f_tgt: np.ndarray) -> np.ndarray:
|
|||
|
|
"""将 S(f_src) 线性插值到 f_tgt (复数分离实虚部)。"""
|
|||
|
|
if np.array_equal(f_src, f_tgt):
|
|||
|
|
return S
|
|||
|
|
nports = S.shape[1]
|
|||
|
|
S_new = np.zeros((len(f_tgt), nports, nports), dtype=complex)
|
|||
|
|
for i in range(nports):
|
|||
|
|
for j in range(nports):
|
|||
|
|
y = S[:, i, j]
|
|||
|
|
real_part = np.interp(f_tgt, f_src, np.real(y))
|
|||
|
|
imag_part = np.interp(f_tgt, f_src, np.imag(y))
|
|||
|
|
S_new[:, i, j] = real_part + 1j * imag_part
|
|||
|
|
return S_new
|
|||
|
|
|
|||
|
|
|
|||
|
|
def evaluate_dataset(model: ParametricVectorFitting,
|
|||
|
|
subset: List[ModelBasicDatasetUnit],
|
|||
|
|
network_loader=Capa.network) -> Dict[str, Any]:
|
|||
|
|
"""对一个子集评估拟合质量,输出每样本与整体统计."""
|
|||
|
|
assert model.frequencies is not None
|
|||
|
|
freqs = model.frequencies
|
|||
|
|
results = []
|
|||
|
|
mag_err_db_all = []
|
|||
|
|
rms_all = []
|
|||
|
|
for unit in subset:
|
|||
|
|
net = network_loader(unit.result_dir, unit.id)
|
|||
|
|
S_ref = _interp_s_to_freqs(net.s, net.f, freqs) # (Nf, nports, nports)
|
|||
|
|
S_pred = model.evaluate(unit.parameters["W"], unit.parameters["L"], freqs)
|
|||
|
|
err = S_pred - S_ref
|
|||
|
|
rms = float(np.sqrt(np.mean(np.abs(err)**2)))
|
|||
|
|
# 幅度 dB 误差
|
|||
|
|
mag_ref_db = 20*np.log10(np.clip(np.abs(S_ref), 1e-15, None))
|
|||
|
|
mag_pred_db = 20*np.log10(np.clip(np.abs(S_pred), 1e-15, None))
|
|||
|
|
mag_err_db = float(np.mean(np.abs(mag_pred_db - mag_ref_db)))
|
|||
|
|
rms_all.append(rms)
|
|||
|
|
mag_err_db_all.append(mag_err_db)
|
|||
|
|
results.append({
|
|||
|
|
'id': unit.id,
|
|||
|
|
'W': unit.parameters['W'],
|
|||
|
|
'L': unit.parameters['L'],
|
|||
|
|
'rms': rms,
|
|||
|
|
'mag_err_db': mag_err_db
|
|||
|
|
})
|
|||
|
|
summary = {
|
|||
|
|
'count': len(subset),
|
|||
|
|
'rms_mean': float(np.mean(rms_all)) if rms_all else None,
|
|||
|
|
'rms_max': float(np.max(rms_all)) if rms_all else None,
|
|||
|
|
'mag_err_db_mean': float(np.mean(mag_err_db_all)) if mag_err_db_all else None,
|
|||
|
|
'mag_err_db_max': float(np.max(mag_err_db_all)) if mag_err_db_all else None,
|
|||
|
|
'details': results
|
|||
|
|
}
|
|||
|
|
return summary
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 简要使用示例 (非执行):
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
|
|||
|
|
capa = Capa()
|
|||
|
|
capa.sweep()
|
|||
|
|
print("sweep finished")
|
|||
|
|
capa.export('capa_results.json')
|
|||
|
|
capa.clear()
|
|||
|
|
capa.load('capa_results.json')
|
|||
|
|
dataset = capa.results
|
|||
|
|
|
|||
|
|
# 70% 训练 / 30% 测试
|
|||
|
|
train_set, test_set = split_dataset(dataset, train_ratio=0.7, shuffle=True, seed=42)
|
|||
|
|
print(f"Train: {len(train_set)} Test: {len(test_set)}")
|
|||
|
|
|
|||
|
|
cfg = ParametricVFConfig(n_poles=10, param_total_degree=3, vf_iterations=2,
|
|||
|
|
max_freq_points=50)
|
|||
|
|
pvf = ParametricVectorFitting(cfg).fit(train_set)
|
|||
|
|
|
|||
|
|
train_metrics = evaluate_dataset(pvf, train_set)
|
|||
|
|
test_metrics = evaluate_dataset(pvf, test_set)
|
|||
|
|
|
|||
|
|
pvf.meta['train_eval'] = train_metrics
|
|||
|
|
pvf.meta['test_eval'] = test_metrics
|
|||
|
|
|
|||
|
|
def _print_metrics(title: str, m: Dict[str, Any]):
|
|||
|
|
print(f"[{title}] samples={m['count']}"
|
|||
|
|
f" RMS_mean={m['rms_mean']:.4e} RMS_max={m['rms_max']:.4e}"
|
|||
|
|
f" |Δ|dB_mean={m['mag_err_db_mean']:.3f}dB |Δ|dB_max={m['mag_err_db_max']:.3f}dB")
|
|||
|
|
|
|||
|
|
_print_metrics("TRAIN", train_metrics)
|
|||
|
|
_print_metrics("TEST", test_metrics)
|
|||
|
|
|
|||
|
|
# 示例:导出一个测试点的拟合结果为 s2p
|
|||
|
|
if test_set:
|
|||
|
|
sample = test_set[0]
|
|||
|
|
out_path = pvf.export_s2p(sample.parameters["W"], sample.parameters["L"], filepath='outputs/pvf_test_sample')
|
|||
|
|
print("Exported:", out_path)
|