feat: 完成了算法的最基础部分

This commit is contained in:
mayge
2025-09-20 12:02:24 -04:00
parent ec5e2aaddc
commit e62e8df013
10 changed files with 381 additions and 1202 deletions

3
.gitignore vendored
View File

@@ -4,4 +4,5 @@ __pycache__/
*.pyd
__pypackages__/
env/
.venv/
.venv/
.vscode/

6
README.md Normal file
View File

@@ -0,0 +1,6 @@
1. 添加 A B C D 方法,对应不同的基
2. 修改Levi部分和sk迭代部分使用更通用的A B C D求解
3. 添加正交基形式
4. 加入多端口
5. 引入QR消除中间过程的残差求解降低算法复杂度
6. 使用最初始的表达式,获得无偏估计

241
core/basis.py Normal file
View File

@@ -0,0 +1,241 @@
import numpy as np
from core.sk_iter import generate_starting_poles
from scipy.linalg import block_diag
import skrf as rf
from skrf import VectorFitting
from core.freqency import auto_select
class BasicBasis:
def __init__(self,H,freqs,poles,weights=None):
self.least_squares_rms_error = None
self.least_squares_condition = None
self.eigenval_condition = None
self.eigenval_rms_error = None
self.H = H
self.freqs = freqs
self.s = self.freqs * 2j * np.pi
self.P = len(poles)
self.poles = poles
self.Phi = self.generate_basis(self.s, self.poles)
self.A = self.matrix_A(self.poles)
self.B = self.vector_B(self.poles)
self.D = 1.0
self.Cr,self.Cw = self.fit_denominator(self.H, d0=1.0, weights=weights)
z = np.linalg.eigvals(self.A - self.B @ self.Cw)
p_next = -z
# enforce LHP and pair ordering
p_next = np.where(np.real(p_next) < 0, p_next, -np.conj(p_next))
p_next = np.sort_complex(p_next)
self.next_poles = p_next
# z = np.where(np.real(z) < 0, z, -np.conj(z)) # enforce LHP
# self.next_poles = np.sort_complex(z)
self.eigenval_condition = np.linalg.cond(self.A - self.B @ self.Cw)
self.eigenval_rms_error = np.sqrt(np.mean(np.abs(np.real(z) - np.real(poles))**2 + np.abs(np.imag(z) - np.imag(poles))**2))
self.Dt = self.eval_Dt_state_space()
self.delta = self.Dt / weights if weights is not None else self.Dt
pass
def eval_Dt_state_space(self):
"""Return D(s_k)=C(s_k I - A)^(-1)B + D for all k (complex 1D array)."""
s = 1j * 2*np.pi * np.asarray(self.freqs, float).ravel()
A = np.asarray(self.A, np.complex128); n = A.shape[0]
B = np.asarray(self.B, np.complex128).reshape(n, 1)
C = np.asarray(self.Cw, float).reshape(1, n)
D = self.D
I = np.eye(n, dtype=np.complex128)
out = np.empty_like(s, dtype=np.complex128)
for k, sk in enumerate(s):
DS = D + (C @ np.linalg.inv(sk*I - A) @ B)
out[k] = DS[0, 0]
return out
def generate_basis(self,s, poles):
"""Real basis of (15)-(16); returns Φ(s) and a layout for packing C."""
cols = []
i = 0
while i < len(poles):
p = poles[i]
if p.real > 0:
raise ValueError("poles must be in the LHP")
if i+1 < len(poles) and np.isclose(poles[i+1], np.conj(p)):
pc = poles[i+1]
phi1 = 1/(s - p) + 1/(s - pc) # eq (15)generate_basis
phi2 = 1j*(1/(s - p) - 1/(s - pc)) # eq (16) (fixed sign)
cols += [phi1, phi2]
i += 2
else:
cols.append(1/(s - p))
i += 1
Phi = np.column_stack(cols).astype(np.complex128)
return Phi
def matrix_A(self, poles):
def A_block(p):
if abs(p.imag) < 1e-14:
return np.array([[p.real]], float) # A_p = [ p ]
return np.array([[p.real, p.imag], # A_p = [[Re p, Im p],
[-p.imag, p.real]], float) # [-Im p, Re p]]
A = None; i = 0
while i < len(poles):
p = poles[i]
Ab = A_block(p)
if i+1 < len(poles) and np.isclose(poles[i+1], np.conj(p)): i += 2
else: i += 1
A = Ab if A is None else block_diag(A, Ab)
return A
def vector_B(self, poles):
def B_block(p):
return np.array([[1.0]], float) if abs(p.imag)<1e-14 else np.array([[2.0],[0.0]], float)
B = None; i = 0
while i < len(poles):
p = poles[i]
Bb = B_block(p)
if i+1 < len(poles) and np.isclose(poles[i+1], np.conj(p)): i += 2
else: i += 1
B = Bb if B is None else np.vstack([B, Bb])
return B
def fit_denominator(self, H, d0=1.0, weights=None):
"""
Solve formula (70) on the real basis Φ to obtain:
- d (real) → packs into C for this state's block structure
- gamma (complex)
Optional 'weights' (K,) apply row scaling: SK weighting if 1/|D_prev|.
"""
if weights is None:
weights = np.diag(np.ones(len(H), np.complex128))
else:
weights = np.diag([1/res for res in weights])
s = self.s
H = np.asarray(H, np.complex128).reshape(-1,1)
Phi = self.Phi
psi = weights @ Phi
psi = Phi
HPhi = H * Phi
A_re = np.hstack([np.real(-psi), np.real(-HPhi)])
A_im = np.hstack([np.imag(-psi), np.imag(-HPhi)])
b_re = np.real(d0 * H)
b_im = np.imag(d0 * H)
A = np.vstack([A_re, A_im]).astype(float)
# rown = np.linalg.norm(A, axis=1)
# rown = np.sqrt(rown)
# A = rown[:,None] * A
b = np.concatenate([b_re, b_im]).astype(float)
x = np.linalg.inv(A.T @ A) @ A.T @ b
self.least_squares_rms_error = np.sqrt(np.mean((A @ x - b)**2))
self.least_squares_condition = np.linalg.cond(A)
Cn,Cd = self.vector_C(x)
return Cn,Cd
def vector_C(self,x):
Cn = np.asarray([x[:len(x)//2]], float).reshape(1,-1)
Cd = np.asarray([x[len(x)//2:]], float).reshape(1,-1)
return Cn, Cd
def evaluate(self,freqs,poles,Cn,Cd,d0=1.0):
s = 1j * 2*np.pi * np.asarray(freqs, float).ravel()
phi = self.generate_basis(s, poles)
num = phi @ Cn.T
den = d0 + phi @ Cd.T
H = num / den
return H.ravel()
if __name__ == "__main__":
network = rf.Network("/tmp/paramer/simulation/3000/3000.s2p")
K = 10
H11,freqs = auto_select([network.y[i][0][0] for i in range(2,len(network.y))],network.f[2:],max_points=20)
poles = generate_starting_poles(2,beta_min=freqs[0]/1.1,beta_max=freqs[-1]*1.1)
Dt_1 = np.ones((len(freqs),1),np.complex128)
# Levi step (no weighting):
basis = BasicBasis(H11,freqs,poles=poles)
Dt = basis.Dt
poles = basis.next_poles
print("Levi step (no weighting):")
print("A:",basis.A)
print("B:",basis.B)
print("C:",basis.Cw)
print("D:",basis.D)
print("next_pozles:",basis.next_poles)
print("Dt:",Dt, "norm:",np.linalg.norm(Dt))
# SK weighting (optional, after first pass):
least_squares_condition = []
least_squares_rms_error = []
eigenval_condition = []
eigenval_rms_error = []
for i in range(K):
basis = BasicBasis(H11,freqs,poles=poles,weights=Dt)
Dt_1 = Dt
Dt = basis.Dt
poles = basis.next_poles
print(f"SK Iteration {i+1}/{K}")
print("A:",basis.A)
print("B:",basis.B)
print("C:",basis.Cw)
print("D:",basis.D)
print("z:",basis.next_poles)
print("Dt:",Dt)
print("Dt/Dt-1",np.linalg.norm(Dt) / np.linalg.norm(Dt_1))
least_squares_condition.append(basis.least_squares_condition)
least_squares_rms_error.append(basis.least_squares_rms_error)
eigenval_condition.append(basis.eigenval_condition)
eigenval_rms_error.append(basis.eigenval_rms_error)
# H11_evaluated = basis.evaluate_pole_residue(network.f[1:],poles,basis.C[0])
H11_evaluated = basis.evaluate(network.f[2:], poles, basis.Cr[0],basis.Cw[0], d0=1.0)
import matplotlib.pyplot as plt
fig, axes = plt.subplots(3, 2, figsize=(15, 16), sharex=False)
ax0 = axes[0][0]
ax0.plot(network.f[2:], np.abs([network.y[i][0][0] for i in range(2,len(network.y))]), 'o', ms=4, color='red', label='Samples')
ax0.plot(network.f[2:], np.abs(H11_evaluated), '-', lw=2, color='k', label='Fit')
ax0.plot(freqs, np.abs(H11), 'x', ms=4, color='blue', label='Input Samples')
ax0.set_title("Response i=0, j=0")
ax0.set_ylabel("Magnitude")
ax0.legend(loc="best")
ax1 = axes[1][0]
ax1.plot(least_squares_condition, label='Least Squares Condition')
ax1.set_title("least_squares_condition")
ax1.set_ylabel("Magnitude")
ax1.legend(loc="best")
ax2 = axes[1][1]
ax2.plot(least_squares_rms_error, label='Least Squares RMS Error')
ax2.set_title("least_squares_rms_error")
ax2.set_ylabel("Magnitude")
ax2.legend(loc="best")
ax3 = axes[2][0]
ax3.plot(eigenval_condition, label='Eigenvalue Condition')
ax3.set_title("eigenval_condition")
ax3.set_ylabel("Magnitude")
ax3.legend(loc="best")
ax4 = axes[2][1]
ax4.plot(eigenval_rms_error, label='Eigenvalue RMS Error')
ax4.set_title("eigenval_rms_error")
ax4.set_ylabel("Magnitude")
ax4.legend(loc="best")
fig.tight_layout()
plt.savefig(f"basic_basis.png")

131
core/freqency.py Normal file
View File

@@ -0,0 +1,131 @@
import numpy as np
def auto_select(H, freq,
n_baseline=64, # log-spaced backbone points
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
peak_window=5, # take ±peak_window samples around each peak
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
max_points=25, # final cap on selected samples (None = no cap)
ensure_ends=True):
"""
Select several significant sample points for vector fitting.
Strategy:
1) Always keep endpoints (optional).
2) Add a log-spaced baseline over the band.
3) Detect resonance peaks in |H| (on a log scale) and keep small windows around them.
4) Add points with the largest magnitude slope and phase-change (w.r.t log-f).
5) De-duplicate, sort, and optionally thin to 'max_points' with priority
to endpoints and detected peaks.
Parameters
----------
H : (N,) complex array
Frequency response samples.
freq : (N,) float array
Frequency axis [Hz], strictly increasing.
n_baseline : int
Count of log-spaced baseline samples across the band.
peak_prominence : float
Peak prominence threshold as a fraction of the dynamic range in log|H|.
0.05 ≈ keep peaks ≥ 5% of the range.
peak_window : int
Number of neighbor indices to include on each side of every detected peak.
topgrad_q : float in (0,1)
Quantile for selecting strong slope/phase points.
0.98 ⇒ keep the top 2% largest derivatives.
max_points : int or None
If not None, cap the total number of selected indices to this value.
ensure_ends : bool
Always include the first and last samples.
Returns
-------
H_sel : (K,) complex array
freq_sel : (K,) float array
"""
H = np.asarray(H).reshape(-1)
f = np.asarray(freq).reshape(-1)
if H.size != f.size:
raise ValueError("H and freq must have the same length.")
N = f.size
if N < 4:
return H.copy(), f.copy()
eps = 1e-16
mag = np.abs(H)
logmag = np.log10(mag + eps)
phase = np.unwrap(np.angle(H))
# log-frequency axis (scale-invariant derivatives)
# keep it linear if any non-positive freq sneaks in
if np.all(f > 0):
lf = np.log(f)
else:
lf = f.copy()
dlf = np.gradient(lf)
d_logmag = np.gradient(logmag) / (dlf + 1e-16)
d_phase = np.gradient(phase) / (dlf + 1e-16)
idx = set()
if ensure_ends:
idx.update([0, N-1])
# 1) log-spaced baseline
if n_baseline > 0:
# map a log grid to nearest indices
grid = np.linspace(lf.min(), lf.max(), n_baseline)
base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1)
idx.update(np.unique(base_idx).tolist())
# 2) peaks in |H|
try:
from scipy.signal import find_peaks
dyn = logmag.max() - logmag.min()
prom = peak_prominence * (dyn + 1e-12)
peaks, _ = find_peaks(logmag, prominence=prom)
except Exception:
# simple fallback: strict local maxima
peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
for p in peaks:
lo = max(0, p - peak_window)
hi = min(N, p + peak_window + 1)
idx.update(range(lo, hi))
# 3) strongest slope / phase-change points
thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
# 4) finalize set
sel = np.array(sorted(idx), dtype=int)
# 5) optional thinning with priority to endpoints and peaks
if max_points is not None and sel.size > max_points:
priority = np.zeros(sel.size, dtype=int)
if ensure_ends:
priority[(sel == 0) | (sel == N-1)] = 3
if peaks.size:
priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2)
keep = []
budget = max_points
# keep highest-priority first
for lev in (3, 2, 1, 0):
cand = sel[priority == lev]
if cand.size == 0:
continue
if cand.size <= budget:
keep.extend(cand.tolist())
budget -= cand.size
else:
step = max(1, int(np.ceil(cand.size / budget)))
keep.extend(cand[::step][:budget].tolist())
budget = 0
if budget == 0:
break
sel = np.array(sorted(set(keep)), dtype=int)
return H[sel], f[sel]

View File

@@ -1,36 +1,5 @@
import numpy as np
# ------------------------------------------------------------
# 按论文公式 (9)(10)(11) 生成 MuntzLaguerre 正交有理基 (解析形式)
#
# 给定稳定极点集合 {p_k} (Re(p_k)<0)。论文记法中使用 -a_k其中 Re(a_k)>0。
# 对应关系p_k = -a_k ⇒ a_k = -p_k, Re(a_k)= -Re(p_k) >0
#
# 连续内积意义下(沿 jω 轴积分)这些 φ_k 解析正交。离散频率采样后数值上
# 可能偏离,可再用加权 QR 做数值再正交(可选)。
#
# 公式在“稳定极点 p 表达”下的改写:
# (原) 实极点: φ_p(s) = sqrt(2 Re(a_p)) / (s + a_p) * Π (s - a_i^*)/(s + a_i)
# 变换 a_p = -p ⇒ Re(a_p)= -Re(p) = σ >0 且 (s + a_p) = (s - p)
# 且乘积 (s - a_i^*)/(s + a_i) = (s + p_i^*)/(s - p_i)
# ⇒ φ_p(s) = sqrt(-2 Re(p)) / (s - p) * Π_{i<p} (s + p_i^*)/(s - p_i)
#
# 复对 (p, p*),取 imag(p)>0 的 p 作为首:
# (原) φ_p = sqrt(2 Re(a_p)) (s - |a_p|)/[(s + a_p)(s + a_p^*)] * Π(...)
# (原) φ_{p+1}= sqrt(2 Re(a_p)) (s + |a_p|)/[(s + a_p)(s + a_p^*)] * Π(...)
# 代入 a_p=-p
# Re(a_p)= -Re(p)=σ>0, (s + a_p) = (s - p), (s + a_p^*)=(s - p^*)
# |a_p| = |p|
# 乘积同上 ⇒ Π_{i<p} (s + p_i^*)/(s - p_i)
#
# ⇒ φ_p(s) = sqrt(-2 Re(p)) (s - |p|)/[(s - p)(s - p^*)] * Π_{i<p} (s + p_i^*)/(s - p_i)
# φ_{p^*}(s)= sqrt(-2 Re(p)) (s + |p|)/[(s - p)(s - p^*)] * Π_{i<p} (s + p_i^*)/(s - p_i)
#
# product 递推维护prod_k = Π_{i≤k} (s + p_i^*)/(s - p_i)
# 对复对要顺序乘两次p 与 p*)。
# ------------------------------------------------------------
def generate_laguerre_basis(poles:list|np.ndarray, s: np.ndarray):
basis = np.zeros((len(poles)+1,len(s)),dtype=complex)
@@ -82,93 +51,3 @@ def generate_laguerre_basis(poles:list|np.ndarray, s: np.ndarray):
i += 1
basis[i + 1] = phi
return basis
# class MuntzLaguerreIterator:
# def __init__(self, s: np.ndarray, stable_poles: list | np.ndarray):
# """
# s: 复频率数组 (Nf,), s = j 2π f
# stable_poles: 稳定极点列表 (Re<0). 复共轭对要求正虚部在前 (p, p*).
# """
# self.s = np.asarray(s, dtype=complex)
# self.poles = list(stable_poles)
# self.N = len(self.poles)
# self.k = 0
# # 初始化乘积 Π_{i<p} (s + p_i^*)/(s - p_i)
# self.product = np.ones_like(self.s, dtype=complex)
# def __iter__(self):
# return self
# def __next__(self):
# if self.k >= self.N:
# raise StopIteration
# p = self.poles[self.k]
# if np.real(p) >= 0:
# raise ValueError(f"极点必须在左半平面: {p}")
# # 复对首 (正虚部)
# if np.iscomplex(p) and np.imag(p) > 0:
# if self.k + 1 >= self.N:
# raise ValueError("复极点缺少共轭")
# pc = self.poles[self.k + 1]
# if not np.isclose(pc, np.conj(p)):
# raise ValueError("复极点未按 (p, p*) 顺序排列 (正虚部在前)")
# sigma = -np.real(p) # >0
# scale = np.sqrt(2 * sigma)
# r = np.abs(p)
# denom = (self.s - p) * (self.s - pc)
# # 两个基函数
# phi_p = scale * (self.s - r) / denom * self.product
# phi_pc = scale * (self.s + r) / denom * self.product
# # product 先乘 (s + p^*)/(s - p),再乘 (s + p)/(s - p^*)
# self.product = self.product * (self.s + pc) / (self.s - p)
# self.product = self.product * (self.s + p) / (self.s - pc)
# self.k += 2
# return [phi_p, phi_pc]
# # 复对次 (负虚部) —— 应该被首元素处理,出现表示顺序错误
# if np.iscomplex(p) and np.imag(p) < 0:
# raise ValueError("检测到负虚部复极点但其共轭尚未处理,请将正虚部成员放在前面。")
# # 实极点
# sigma = -np.real(p)
# if sigma <= 0:
# raise ValueError("实极点实部应为负 (稳定)。")
# scale = np.sqrt(2 * sigma)
# phi = scale / (self.s - p) * self.product
# # 更新乘积
# self.product = self.product * (self.s + p) / (self.s - p)
# self.k += 1
# return [phi]
# def generate_muntz_laguerre_basis(s: np.ndarray, init_poles: list | np.ndarray):
# """
# 生成完整基函数列表: [φ_0=1, φ_1, φ_2, ...]
# """
# basis = [np.ones_like(s, dtype=complex)]
# for block in MuntzLaguerreIterator(s, init_poles):
# basis.extend(block)
# return basis
# if __name__ == "__main__":
# # 示例稳定极点 (复对正虚部在前)
# stable_poles = [
# -0.8e9,
# -1.0e9 + 2.5e9j,
# -1.0e9 - 2.5e9j,
# -2.2e9
# ]
# freqs = np.linspace(1e8, 8e9, 400)
# s = 1j * 2 * np.pi * freqs
# basis = generate_muntz_laguerre_basis(s, stable_poles)
# print(f"生成 {len(basis)} 个基函数,{basis}")

View File

@@ -1,191 +0,0 @@
from models.basic import ModelBasic
from typing import List,Literal,Dict
import numpy as np
import random
from pydantic import BaseModel
from skrf import Network
from models.basic import ModelBasicDatasetUnit
class RobustParametricConfig(BaseModel):
n_poles: int
max_iter: int = 10
parameter_type: Literal["s","y","z"]
class RobustParametricModel:
def __init__(self,model:ModelBasic,config:RobustParametricConfig):
self.model = model
self.config = config
# 区分训练集和测试集
def _train_test_split(self,train_ratio:float=0.8,random_state:int=42):
random.seed(random_state)
dataset = self.model.results
random.shuffle(dataset)
train_size = int(len(dataset)*train_ratio)
self.train_set = dataset[:train_size]
self.test_set = dataset[train_size:]
return self.train_set, self.test_set
def first_iteration_step(self, datasets: List[ModelBasicDatasetUnit],
param_degree: int = 1):
'''
SK 迭代的第一步 (t=0) 对应论文公式 (3).
目标:
最小化 sum_{样本 n, 频点 f} | N^{(0)}(s_f, g_n) - H(s_f, g_n) |^2
在 t=0 阶段我们令 D^{(0)}(s,g)=1, 仅拟合分子:
N^{(0)}(s,g) = Σ_{p∈P} Σ_{v∈V} c_{p,v} * φ_freq_p(s) * φ_param_v(g)
因此是一个纯线性最小二乘:
H ≈ Σ_{p,v} c_{p,v} F[:,p] ⊗ G[n,v]
记:
F: (Nf, P) 频率正交(或原始)基列
G: (Ns, V) 参数多项式(含常数)基列
设计矩阵 A 大小 (Ns*Nf, P*V), A[(n*Nf + f), (p*V + v)] = F[f,p] * G[n,v]
输出:
self.first_iter_coeffs[(i,j)] = C_{p,v} (P × V) 每个端口对一套系数
self.freq_basis_F = F
self.param_basis_G = G
self.poles 初始极点 (供后续构造有理正交基 / SK 加权使用)
参数:
datasets: 训练用样本列表
param_degree: 参数多项式最大总次数 (默认 1 → 常数 + 线性)
注意:
这里使用简单频率基 [1, 1/(s-a_k)],未做 QR 正交化;
后续 SK 迭代 / 正交化可在第二步再进行。
'''
assert len(datasets) > 0, "空数据集"
# ---------------- 收集频率与端口信息 ----------------
Ns = len(datasets)
freqs = datasets[0].freqs # 频率需要对齐
Nf = freqs.shape[0]
nports = self.model.info.nports
# ---------------- 构造初始极点 (对数分布 + 阻尼) ----------------
def _init_poles(n_poles: int):
fmin, fmax = freqs[0], freqs[-1]
if n_poles <= 0:
return np.array([], dtype=complex)
# 避免 0 Hz用第二个点或微小偏移
start_f = max(fmin if fmin > 0 else freqs[1] if Nf > 1 else 1.0e-3, 1e-12)
f_samples = np.logspace(np.log10(start_f), np.log10(fmax), n_poles)
sigma = 0.02 * 2 * np.pi * fmax # 固定阻尼,可放入 config
return -sigma + 1j * 2 * np.pi * f_samples
self.poles = _init_poles(self.config.n_poles)
s_vec = 1j * 2 * np.pi * freqs # (Nf,)
# ---------------- 频率基 F ----------------
# 列0: 常数 1, 后续列: 1/(s - a_k)
P = 1 + len(self.poles)
F = np.zeros((Nf, P), dtype=complex)
F[:, 0] = 1.0
for k, a in enumerate(self.poles, start=1):
F[:, k] = 1.0 / (s_vec - a)
self.freq_basis_F = F # (Nf, P)
# ---------------- 参数基 G ----------------
# 取出参数向量 g = [param1, param2, ...]
# 假设 datasets[i].parameters 为 dict
param_keys = list(datasets[0].parameters.keys())
d = len(param_keys)
# 构造参数矩阵 (Ns,d)
Pmat = np.zeros((Ns, d), dtype=float)
for i, unit in enumerate(datasets):
for j, k in enumerate(param_keys):
Pmat[i, j] = float(unit.parameters[k])
# 标准化
mean = Pmat.mean(axis=0)
std = Pmat.std(axis=0) + 1e-15
Xn = (Pmat - mean) / std
# 生成多项式指数 (总次数 <= param_degree)
def _gen_param_exps(dim, D):
exps = []
def rec(cur, idx, rem):
if idx == dim:
exps.append(tuple(cur)); return
for t in range(rem + 1):
cur.append(t); rec(cur, idx + 1, rem - t); cur.pop()
rec([], 0, D)
return exps
exps = _gen_param_exps(d, param_degree) # V_exps
V = len(exps)
G = np.zeros((Ns, V), dtype=float)
for vidx, e in enumerate(exps):
val = np.ones(Ns)
for j, p in enumerate(e):
if p:
val *= Xn[:, j] ** p
G[:, vidx] = val
self.param_basis_G = G # (Ns, V)
self.param_basis_meta = {
"keys": param_keys,
"mean": mean.tolist(),
"std": std.tolist(),
"exponents": exps,
"degree": param_degree
}
# ---------------- 构造设计矩阵 A (Kronecker) ----------------
# A shape: (Ns*Nf, P*V)
# 利用广播A_block[n,f,p,v] = F[f,p] * G[n,v]
A4 = np.einsum('fp,nv->nfpv', F, G) # (Ns, Nf, P, V)
A = A4.reshape(Ns * Nf, P * V)
# ---------------- 采集目标 H 数据 ----------------
# 对每个端口对 (i,j) 分别解一个向量 c_{p,v}
# 选择参数类型
param_type = self.config.parameter_type.lower()
valid_types = {"s", "y", "z"}
if param_type not in valid_types:
raise ValueError(f"parameter_type 必须在 {valid_types}")
# 预先载入全部 Network (避免重复 IO)
# H_data[(i,j)] -> (Ns,Nf) 复数
H_data = {}
for i_port in range(nports):
for j_port in range(nports):
H_mat = np.zeros((Ns, Nf), dtype=complex)
for n, unit in enumerate(datasets):
net: Network = unit.network
if param_type == "s":
sij = net.s[:, i_port, j_port]
elif param_type == "y":
sij = net.y[:, i_port, j_port]
else:
sij = net.z[:, i_port, j_port]
# 插值或对齐假设已完成
H_mat[n, :] = sij
H_data[(i_port, j_port)] = H_mat
# ---------------- 最小二乘求系数 ----------------
self.first_iter_coeffs = {}
# 可选: 预计算 A^+ (伪逆) 如果数据不巨大
# pinv = np.linalg.pinv(A)
for key, Hmat in H_data.items():
b = Hmat.reshape(Ns * Nf) # (Ns*Nf,)
# 解 x (长度 P*V)
x, *_ = np.linalg.lstsq(A, b, rcond=None)
C = x.reshape(P, V)
self.first_iter_coeffs[key] = C
# ---------------- 存储便于后续 SK 迭代使用 ----------------
self.meta_first_iter = {
"A_shape": A.shape,
"num_samples": Ns,
"num_freqs": Nf,
"freq_basis_dim": P,
"param_basis_dim": V
}
if getattr(self.config, "verbose", True):
print(f"[t=0] 线性最小二乘完成: A={A.shape}, 频率基P={P}, 参数基V={V}, 端口对={nports*nports}")
return self.first_iter_coeffs

View File

@@ -2,7 +2,6 @@ import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Tuple
# ---------- 1. 生成初始极点(按论文 Re(-a_p) 小、Im 线性分布) ----------
def generate_starting_poles(n_pairs: int, beta_min: float, beta_max: float, alpha_scale: float = 0.01):
"""
仅生成复共轭对: p = -alpha + j beta, p*。
@@ -19,308 +18,3 @@ def generate_starting_poles(n_pairs: int, beta_min: float, beta_max: float, alph
poles += [p, np.conj(p)]
print(f"生成 {len(poles)} 个初始极点 (复对) {poles}]")
return poles
# ---------- 2. MuntzLaguerre 频率基 (与你现有 orthonormal_basis 中一致的核心) ----------
def muntz_laguerre_basis(freqs_hz: np.ndarray, stable_poles: List[complex]) -> np.ndarray:
"""
返回 Φ_ml (Nf, P) 列顺序: φ_0=1, 后续依次 (单极点 / 复对两列)
"""
s = 1j * 2 * np.pi * freqs_hz
basis = [np.ones_like(s, dtype=complex)]
product = np.ones_like(s, dtype=complex)
k = 0
N = len(stable_poles)
while k < N:
p = stable_poles[k]
if np.real(p) >= 0:
raise ValueError("极点需在左半平面")
if np.imag(p) > 0: # 复对首
if k + 1 >= N or not np.isclose(stable_poles[k+1], np.conj(p)):
raise ValueError("复对未配对")
pc = stable_poles[k+1]
sigma = -np.real(p)
scale = np.sqrt(2 * sigma)
r = np.abs(p)
denom = (s - p) * (s - pc)
phi_p = scale * (s - r) / denom * product
phi_pc = scale * (s + r) / denom * product
basis.extend([phi_p, phi_pc])
# update product
product = product * (s + pc)/(s - p) * (s + p)/(s - pc)
k += 2
elif np.imag(p) < 0:
raise ValueError("负虚部复极点必须跟在正虚部之后")
else: # 实极点
sigma = -np.real(p)
scale = np.sqrt(2 * sigma)
phi = scale / (s - p) * product
basis.append(phi)
product = product * (s + p)/(s - p)
k += 1
return np.column_stack(basis) # (Nf, P)
# ---------- 3. 原始部分分式基 ----------
def raw_partial_fraction_basis(freqs_hz: np.ndarray, stable_poles: List[complex]) -> np.ndarray:
s = 1j * 2 * np.pi * freqs_hz
cols = [np.ones_like(s, dtype=complex)]
for p in stable_poles:
cols.append(1.0 / (s - p))
return np.column_stack(cols) # (Nf, P)
# ---------- 4. 变换矩阵 Φ_ml T = G_raw ----------
def compute_transform_matrix(Phi_ml: np.ndarray, G_raw: np.ndarray, weights: np.ndarray | None = None) -> np.ndarray:
# 加权最小二乘 T = (Φ^H W Φ)^{-1} Φ^H W G
if weights is None:
WPhi = Phi_ml
WG = G_raw
M = Phi_ml.conj().T @ WPhi
RHS = Phi_ml.conj().T @ WG
else:
w = weights[:, None]
M = Phi_ml.conj().T @ (w * Phi_ml)
RHS = Phi_ml.conj().T @ (w * G_raw)
T = np.linalg.solve(M, RHS)
return T # (P,P), Φ T = G
# ---------- 5. 参数多项式正交基 ----------
def generate_param_basis(param_matrix: np.ndarray, total_degree: int):
"""
param_matrix: (Ns, d)
返回:
Qμ: (Ns, V) 正交列
Rμ: (V, V) 上三角
exps: 多指数列表
"""
X = param_matrix
Ns, d = X.shape
# 生成多指数
exps=[]
def rec(cur,i,rem):
if i==d:
exps.append(tuple(cur)); return
for k in range(rem+1):
cur.append(k); rec(cur,i+1,rem-k); cur.pop()
rec([],0,total_degree)
V = len(exps)
M = np.zeros((Ns, V))
for idx,e in enumerate(exps):
v = np.ones(Ns)
for j,p in enumerate(e):
if p: v *= X[:,j]**p
M[:,idx]=v
Q,R = np.linalg.qr(M)
return Q,R,exps
# ---------- 6. t=0 构建设计矩阵并解 C ----------
def fit_t0(H_list: List[np.ndarray], Phi_f: np.ndarray, : np.ndarray) -> np.ndarray:
"""
H_list: 长度 Ns, 每项 (Nf,) 复数
Phi_f: (Nf, P)
Qμ: (Ns, V)
返回: C (P,V)
"""
Ns = len(H_list); Nf,P = Phi_f.shape; V = .shape[1]
cols = P*V
A = np.zeros((Ns*Nf, cols), dtype=complex)
b = np.zeros(Ns*Nf, dtype=complex)
r=0
for n,Hn in enumerate(H_list):
blk = np.einsum('fp,v->fpv', Phi_f, [n]).reshape(Nf, cols)
A[r:r+Nf,:]=blk
b[r:r+Nf]=Hn
r+=Nf
x, *_ = np.linalg.lstsq(A, b, rcond=None)
C = x.reshape(P, V)
return C
# ---------- 7. t=1 (首轮 SK) 解 C, Ct ----------
def fit_t1_SK(H_list: List[np.ndarray], Phi_f: np.ndarray, : np.ndarray,
C_prev: np.ndarray, max_iter: int =1, tol: float =1e-3):
"""
只做一轮 (或少量) SK初始 D=1。
返回: C_new, Ct_new
"""
Ns = len(H_list); Nf,P = Phi_f.shape; V = .shape[1]
# 初始化分母系数 Ct: Ct[0,0]=1
Ct = np.zeros((P,V), dtype=complex)
Ct[0,0]=1.0
C = C_prev.copy()
for it in range(max_iter):
# 构建线性系统: (N - D*H)=0
# 未知顺序: C(:), Ct(:) 去掉固定 Ct[0,0]
mask_fix = np.zeros((P,V), dtype=bool); mask_fix[0,0]=True
col_map={}
col=0
for i in range(P):
for j in range(V):
col_map[('C',i,j)]=col; col+=1
for i in range(P):
for j in range(V):
if mask_fix[i,j]: continue
col_map[('Ct',i,j)]=col; col+=1
A = np.zeros((Ns*Nf, col), dtype=complex)
b = np.zeros(Ns*Nf, dtype=complex)
r=0
# 评价当前 D_prev= Σ Ct φ ψ
for n,Hn in enumerate(H_list):
# 分子块 (Φ_f ⊗ ψ_n)
blk = np.einsum('fp,v->fpv', Phi_f, [n]).reshape(Nf, P*V)
# 填 C 部分
for i in range(P):
for j in range(V):
A[r:r+Nf, col_map[('C',i,j)]] = blk[:, i*V + j]
# 分母块: - H * blk
for i in range(P):
for j in range(V):
if mask_fix[i,j]: continue
A[r:r+Nf, col_map[('Ct',i,j)]] = - Hn * blk[:, i*V + j]
r+=Nf
# 求解
x, *_ = np.linalg.lstsq(A, b, rcond=None)
# 拆回
idx=0
for i in range(P):
for j in range(V):
C[i,j]=x[idx]; idx+=1
Ct_new = Ct.copy()
for i in range(P):
for j in range(V):
if mask_fix[i,j]: continue
Ct_new[i,j]=x[idx]; idx+=1
# 收敛性简单检查:分母变化
diff = np.max(np.abs(Ct_new - Ct))
Ct = Ct_new
if diff < tol:
break
return C, Ct
# ---------- 8. 假极点检测 (在 raw 基上) ----------
def detect_spurious_poles(C_ml: np.ndarray, Ct_ml: np.ndarray, T: np.ndarray,
: np.ndarray,
tau_cancel=1e-2, tau_small=1e-3, eps=1e-14):
"""
C_ml, Ct_ml: (P,V) (Muntz-Laguerre 基)
T: Φ_ml T = G_raw
Qμ: (Ns,V)
"""
# raw 系数: c_raw = T^{-1} c_ml
Tinv = np.linalg.inv(T)
C_raw = Tinv @ C_ml
Ct_raw = Tinv @ Ct_ml
P,V = C_ml.shape
Ns = .shape[0]
r_vals = C_raw @ .T
q_vals = Ct_raw @ .T
metrics={}
scales=[]
for k in range(1,P): # 跳过常数列
rv = r_vals[k]; qv = q_vals[k]
diff = np.max(np.abs(rv - qv))
scale = np.max([np.max(np.abs(rv)), np.max(np.abs(qv)), eps])
eta = diff / scale
metrics[k] = {"diff": diff, "scale": scale, "eta": eta}
scales.append(scale)
S_max = max(scales) if scales else 1.0
cancel_spurious = [k for k in range(1,P)
if metrics[k]["eta"] < tau_cancel and metrics[k]["scale"] > tau_small * S_max]
small_spurious = [k for k in range(1,P)
if metrics[k]["scale"] <= tau_small * S_max]
return {
"cancel_spurious": cancel_spurious,
"small_spurious": small_spurious,
"metrics": metrics,
"S_max": S_max,
"C_raw": C_raw,
"Ct_raw": Ct_raw
}
# ---------- 9. 统一入口 ----------
@dataclass
class OPVFResult:
C_ml: np.ndarray
Ct_ml: np.ndarray
C_raw: np.ndarray
Ct_raw: np.ndarray
T: np.ndarray
: np.ndarray
: np.ndarray
exps: List[Tuple[int]]
poles: List[complex]
spurious: Dict
def opvf_from_H(H_list: List[np.ndarray],
freqs_hz: np.ndarray,
param_matrix: np.ndarray,
total_degree: int,
poles: List[complex],
do_t1: bool = True) -> OPVFResult:
"""
H_list: 长度 Ns; 每项 shape (Nf,)
freqs_hz: (Nf,)
param_matrix: (Ns,d) 归一化前参数值 (内部不自动标准化以保持可控)
poles: 初始稳定极点列表
"""
# 频率基
Phi_ml = muntz_laguerre_basis(freqs_hz, poles) # (Nf,P)
G_raw = raw_partial_fraction_basis(freqs_hz, poles) # (Nf,P)
# 简单权 (ω 权): w = Δf (因 (1/2π)∫ φφ* dω => ∑ Δf)
w = _trap_weights(freqs_hz)
T = compute_transform_matrix(Phi_ml, G_raw, w)
# 参数基
, , exps = generate_param_basis(param_matrix, total_degree)
# t=0
C_ml = fit_t0(H_list, Phi_ml, )
if do_t1:
C_ml, Ct_ml = fit_t1_SK(H_list, Phi_ml, , C_ml, max_iter=1)
else:
# D=1
P,V = C_ml.shape
Ct_ml = np.zeros((P,V), dtype=complex)
Ct_ml[0,0]=1.0
# 假极点检测
spurious = detect_spurious_poles(C_ml, Ct_ml, T, )
return OPVFResult(
C_ml=C_ml,
Ct_ml=Ct_ml,
C_raw=spurious["C_raw"],
Ct_raw=spurious["Ct_raw"],
T=T,
=,
=,
exps=exps,
poles=poles,
spurious=spurious
)
# ---------- 辅助: 梯形权 ----------
def _trap_weights(f: np.ndarray):
if len(f)==1: return np.ones(1)
df = np.diff(f)
w = np.zeros_like(f)
w[0]=0.5*df[0]; w[-1]=0.5*df[-1]
if len(f)>2:
w[1:-1]=0.5*(df[:-1]+df[1:])
return w
# ---------- 简单测试占位 ----------
if __name__ == "__main__":
# 虚构数据: 2 个参数样本 (Ns=2), 频率 200 点
freqs = np.linspace(1e8, 5e9, 200)
# 真正模型 (示例): H = Σ_k R_k/(s - p_k)
true_poles = [-0.5e3 + 1.2e9j, -0.5e3 - 1.2e9j]
s = 1j*2*np.pi*freqs
def synth(Rs):
return Rs[0]/(s - true_poles[0]) + Rs[1]/(s - true_poles[1])
H_list = [synth([0.8+0.1j, 1.2-0.2j]), synth([0.9+0.05j, 1.1+0.1j])]
params = np.array([[0.0],[1.0]]) # 1 维参数
# 给一个冗余极点集合 (含真实 + 额外)
start_poles = generate_starting_poles(n_pairs=10, beta_min=5e8, beta_max=2.0e9, alpha_scale=0.000001)
res = opvf_from_H(H_list, freqs, params, total_degree=1, poles=start_poles)
print("C_ml shape:", res.C_ml.shape)
print("假极点(cancel):", res.spurious["cancel_spurious"])
print("假极点(small):", res.spurious["small_spurious"])

Binary file not shown.

Before

Width:  |  Height:  |  Size: 48 KiB

View File

@@ -8,6 +8,7 @@ from scipy.linalg import null_space
from plotly.subplots import make_subplots
import plotly.graph_objs
from core.freqency import auto_select
from scipy.linalg import block_diag
network = rf.Network("/tmp/paramer/simulation/3500/3500.s2p")

View File

@@ -1,583 +0,0 @@
"""
Parametric (multivariate) orthonormal vector fitting for geometry + frequency.
根据论文: Robust Parametric Macromodeling Using Multivariate Orthonormal Vector Fitting
核心思想 (简化实现版本):
1. 在频域上使用一组全局极点 a_k, 通过所有参数样本共享。
2. 对每个几何样本 s (由 (W,L) 给出) 与端口对 (i,j) 构建线性最小二乘问题, 固定极点估计该样本的残值 r_{k,s}, 以及常数/斜率项 d_s, e_s。
3. 将随几何参数变化的残值/常数/斜率在一个离散正交 (QR) 的多元多项式基 Φ_q(W,L) 上展开: r_k(W,L) ≈ Σ_q c_{k,q} Φ_q(W,L)。
4. 评价阶段: 先用 (W,L) 计算 Φ(W,L), 再恢复 r_k,d,e, 组合成有理函数 Σ_k r_k/(p-a_k)+d+p e。
该文件实现一个最小可用版本, 方便后续扩展(极点迭代 / 被动性约束 / 模型压缩等)。
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any, Optional
import numpy as np
from skrf import Network
import skrf as rf
from models.capa import Capa
from models.basic import ModelBasicDatasetUnit
import warnings
@dataclass
class ParametricVFConfig:
# 有理函数项数量 (K 个极点)
n_poles: int = 8
# 参数多项式最大总次数 (a+b <= degree)
param_total_degree: int = 3
# 频率单位: Hz (Network 中 f 默认 Hz)
damping: float = 0.02 # 极点实部相对于 2π f_max 的比例, 保证稳定
# 最大保留频率点 (特征频率抽样); None 表示不裁剪
max_freq_points: Optional[int] = None
# 频率选择方法: 'curvature' 或 'uniform'
freq_selection_method: str = 'curvature'
# 曲率阈值(可选), 仅 method='curvature' 时生效
curvature_threshold: Optional[float] = None
# VF 极点迭代次数 (0 表示不迭代)
vf_iterations: int = 0
# 极点迭代时是否打印警告
verbose: bool = True
# 未来可添加: 加权, 正则, 被动性投影等
def generate_monomial_exponents(dim: int, total_degree: int) -> List[Tuple[int, ...]]:
"""生成所有满足指数和 <= total_degree 的多元单项式指数。
对于 2 维 (W,L) 返回 (a,b) 列表。
"""
exps: List[Tuple[int, ...]] = []
def rec(prefix: Tuple[int, ...], remaining_dim: int, max_sum: int):
if remaining_dim == 1:
for i in range(max_sum + 1):
exps.append(prefix + (i,))
return
for i in range(max_sum + 1):
rec(prefix + (i,), remaining_dim - 1, max_sum - i)
rec(tuple(), dim, total_degree)
return exps
class OrthonormalParamBasis:
"""离散点集上 (W,L) 的多项式基 QR 正交化封装。"""
def __init__(self, W: np.ndarray, L: np.ndarray, total_degree: int):
assert W.shape == L.shape
self.W_mean = W.mean()
self.W_std = W.std() or 1.0
self.L_mean = L.mean()
self.L_std = L.std() or 1.0
self.exps = generate_monomial_exponents(2, total_degree)
# 构建单项式矩阵 M (Ns x M)
M = []
Wn = (W - self.W_mean) / self.W_std
Ln = (L - self.L_mean) / self.L_std
for a, b in self.exps:
M.append((Wn ** a) * (Ln ** b))
M = np.stack(M, axis=1) # (Ns, M)
# QR 分解获得离散正交基 Q, R 上三角
Q, R = np.linalg.qr(M)
self.Q = Q # (Ns, Q)
self.R = R # (Q, Q)
self.R_inv = np.linalg.inv(R)
@property
def n_basis(self) -> int:
return self.Q.shape[1]
def phi_matrix(self) -> np.ndarray:
return self.Q # 对应样本点的基函数值
def eval(self, W: float, L: float) -> np.ndarray:
Wn = (W - self.W_mean) / self.W_std
Ln = (L - self.L_mean) / self.L_std
monomials = []
for a, b in self.exps:
monomials.append((Wn ** a) * (Ln ** b))
m = np.asarray(monomials) # (M,)
# M = Q R => 对新点 m = phi R => phi = m R^{-1}
phi = m @ self.R_inv
return phi # (Q,)
class ParametricVectorFitting:
"""多参数 (W,L) 二端口/多端口 S 参数的简化 Parametric VF。
模型形式 (单个端口对):
H(p; μ) ≈ Σ_{k=1..K} r_k(μ)/(p - a_k) + d(μ) + p e(μ)
其中 μ = (W,L)。r_k, d, e 在参数域被正交基展开。
"""
def __init__(self, config: ParametricVFConfig):
self.cfg = config
self.frequencies: np.ndarray | None = None # (Nf,)
self.poles: np.ndarray | None = None # (K,)
self.basis: OrthonormalParamBasis | None = None
# 残值系数: (nports, nports, K, Q)
self.residue_coeffs: np.ndarray | None = None
# d,e 系数: (nports, nports, Q)
self.d_coeffs: np.ndarray | None = None
self.e_coeffs: np.ndarray | None = None
self.meta: Dict[str, Any] = {}
def _init_global_poles(self, freqs: np.ndarray) -> np.ndarray:
f_min, f_max = freqs.min(), freqs.max()
# 使用对数均匀分布 (排除 0) + 负实部保证稳定
f_samples = np.logspace(np.log10(max(f_min, freqs[1] if len(freqs) > 1 else f_min*1.1)), np.log10(f_max), self.cfg.n_poles)
# 极点: -σ + j 2π f_k
sigma = self.cfg.damping * 2 * np.pi * f_max
poles = -sigma + 1j * 2 * np.pi * f_samples
return poles
# ----------------- 内部工具函数 (前置以便类型检查) -----------------
def _build_frequency_matrix(self, p_vec: np.ndarray, poles: np.ndarray) -> np.ndarray:
K = len(poles)
F_base = np.zeros((len(p_vec), K + 2), dtype=complex)
for k, a_k in enumerate(poles):
F_base[:, k] = 1.0 / (p_vec - a_k)
F_base[:, K] = 1.0
F_base[:, K + 1] = p_vec
return F_base
def _solve_residues_given_poles(self, aligned_S: List[np.ndarray], p_vec: np.ndarray):
assert self.poles is not None, "Poles not initialized"
nports = aligned_S[0].shape[1]
Ns = len(aligned_S)
K = len(self.poles)
residues_samples = np.zeros((nports, nports, K, Ns), dtype=complex)
d_samples = np.zeros((nports, nports, Ns), dtype=complex)
e_samples = np.zeros((nports, nports, Ns), dtype=complex)
F_base = self._build_frequency_matrix(p_vec, self.poles)
for s_idx, Sdata in enumerate(aligned_S):
for i in range(nports):
for j in range(nports):
y = Sdata[:, i, j]
x, *_ = np.linalg.lstsq(F_base, y, rcond=None)
residues_samples[i, j, :, s_idx] = x[:K]
d_samples[i, j, s_idx] = x[K]
e_samples[i, j, s_idx] = x[K+1]
return residues_samples, d_samples, e_samples
def _select_characteristic_freqs(self, freqs: np.ndarray, S_list: List[np.ndarray], max_points: int,
method: str = 'curvature', threshold: Optional[float] = None) -> np.ndarray:
Nf = len(freqs)
if max_points >= Nf:
return np.arange(Nf)
if method == 'uniform':
idx = np.linspace(0, Nf-1, max_points, dtype=int)
return np.unique(idx)
mags = []
for S in S_list:
mags.append(np.mean(np.abs(S), axis=(1,2))) # (Nf,)
agg = np.mean(np.stack(mags, axis=1), axis=1) # (Nf,)
curv = np.zeros_like(agg)
curv[1:-1] = np.abs(agg[2:] - 2*agg[1:-1] + agg[:-2])
base = {0, Nf-1}
if threshold is not None:
cand = np.where(curv >= threshold)[0]
else:
order = np.argsort(-curv)
cand = order
selected = list(base)
for idx in cand:
if idx in base:
continue
selected.append(idx)
if len(selected) >= max_points:
break
return np.array(sorted(selected))
def _vf_iterate_poles(self, aligned_S: List[np.ndarray], p_vec: np.ndarray,
residues_samples: np.ndarray, d_samples: np.ndarray, e_samples: np.ndarray) -> np.ndarray:
assert self.poles is not None, "Poles not initialized"
nports = residues_samples.shape[0]
K = len(self.poles)
Ns = len(aligned_S)
Nf = len(p_vec)
n_pairs = nports * nports
n_r = n_pairs * K
n_c = K
n_d = n_pairs
n_e = n_pairs
total_unknowns = n_r + n_c + n_d + n_e
rows = Ns * Nf * n_pairs
A = np.zeros((rows, total_unknowns), dtype=complex)
b = np.zeros(rows, dtype=complex)
def pair_index(i,j):
return i*nports + j
row = 0
for s_idx, Sdata in enumerate(aligned_S):
for f_idx, p in enumerate(p_vec):
denom = p - self.poles
basis_freq = 1.0/denom
for i in range(nports):
for j in range(nports):
H = Sdata[f_idx, i, j]
b[row] = H
pair = pair_index(i,j)
r_offset = pair*K
A[row, r_offset:r_offset+K] = basis_freq
A[row, n_r:n_r+K] = -H * basis_freq
A[row, n_r + n_c + pair] = 1.0
A[row, n_r + n_c + n_d + pair] = p
row += 1
x, *_ = np.linalg.lstsq(A, b, rcond=None)
c_k = x[n_r:n_r+n_c]
poly_total = np.poly(self.poles)
acc = poly_total.copy()
for k in range(K):
others = [self.poles[m] for m in range(K) if m != k]
term = c_k[k] * np.poly(others)
acc = np.polyadd(acc, term)
new_poles = np.roots(acc)
return new_poles
def fit(self, dataset: List[ModelBasicDatasetUnit], network_loader=Capa.network):
# 载入所有 Network
networks: List[Network] = []
W_list, L_list = [], []
for unit in dataset:
net = network_loader(unit.result_dir, unit.id)
networks.append(net)
W_list.append(unit.parameters["W"])
L_list.append(unit.parameters["L"])
assert len(networks) > 0, "Empty dataset"
# 处理不同频率点数量: 取公共重叠频段并插值到统一网格
freqs_list = [n.f for n in networks]
f_start = max(f[0] for f in freqs_list)
f_stop = min(f[-1] for f in freqs_list)
if f_stop <= f_start:
raise ValueError("No overlapping frequency range among networks")
# 选择基准网格: 使用第一个网络在重叠区间内的频点, 若其它网络缺该点则插值
base_freqs = networks[0].f
mask = (base_freqs >= f_start) & (base_freqs <= f_stop)
freqs = base_freqs[mask]
# 如果其它网络点数差异较大, 可选: 统一稠密度 (这里只在差异>5%时使用最小长度网格)
for f in freqs_list[1:]:
# 判断差异比例
if abs(len(f) - len(freqs)) / max(len(freqs),1) > 0.2:
# 使用所有网络在重叠区间内最少点数, 构造等间距网格
n_common = min(len(ff[(ff>=f_start)&(ff<=f_stop)]) for ff in freqs_list)
freqs = np.linspace(f_start, f_stop, n_common)
warnings.warn("Frequency grids differ significantly; using uniform linspace for interpolation.")
break
# 对每个网络插值到 freqs (实部/虚部分开线性插值)
aligned_S = [] # List[(Nf, nports, nports)]
for net in networks:
f_src = net.f
# 获取在目标范围内原始数据 (若后面改用插值网格不同于原点, 仍需完整插值)
S_src = net.s # (Nf_src, nports, nports)
nports = net.nports
if not np.array_equal(f_src, freqs):
# 线性插值 (对每个端口对实部与虚部)
S_interp = np.zeros((len(freqs), nports, nports), dtype=complex)
for i in range(nports):
for j in range(nports):
y = S_src[:, i, j]
# numpy.interp 需要单调递增; 为类型检查器显式转为 np.ndarray
y_arr = np.asarray(y)
y_real = np.real(y_arr)
y_imag = np.imag(y_arr)
real_part = np.interp(freqs, f_src, y_real)
imag_part = np.interp(freqs, f_src, y_imag)
S_interp[:, i, j] = real_part + 1j * imag_part
aligned_S.append(S_interp)
else:
aligned_S.append(S_src[mask] if len(S_src)==len(base_freqs) and mask.sum()!=len(base_freqs) else S_src)
self.frequencies = freqs
nports = networks[0].nports
Ns = len(networks)
# 参数正交基
self.basis = OrthonormalParamBasis(np.array(W_list), np.array(L_list), self.cfg.param_total_degree)
Phi = self.basis.phi_matrix() # (Ns, Q)
Qn = Phi.shape[1]
# (可选) 特征频率子采样
if self.cfg.max_freq_points is not None and len(freqs) > self.cfg.max_freq_points:
sel_idx = self._select_characteristic_freqs(freqs, aligned_S, self.cfg.max_freq_points,
method=self.cfg.freq_selection_method,
threshold=self.cfg.curvature_threshold)
freqs = freqs[sel_idx]
aligned_S = [S[sel_idx, :, :] for S in aligned_S]
if self.cfg.verbose:
warnings.warn(f"Frequency reduced to {len(freqs)} points (method={self.cfg.freq_selection_method}).")
# 初始化极点
self.poles = self._init_global_poles(freqs)
p_vec = 1j * 2 * np.pi * freqs
# 首次求解残值
residues_samples, d_samples, e_samples = self._solve_residues_given_poles(aligned_S, p_vec)
# 极点迭代 (简化共享极点 VF)
if self.cfg.vf_iterations > 0:
for it in range(self.cfg.vf_iterations):
new_poles = self._vf_iterate_poles(aligned_S, p_vec, residues_samples, d_samples, e_samples)
# 反射不稳定极点
new_poles = np.array([p if p.real < 0 else complex(-p.real, p.imag) for p in new_poles])
# 去重: 近似重复 (距离 < 1e-3 * |p|) 过滤
filtered = []
for p in new_poles:
if all(abs(p - q) > 1e-3*max(1.0, abs(p)) for q in filtered):
filtered.append(p)
if self.cfg.verbose:
print(f"[VF-Iter {it+1}] poles -> {len(filtered)} kept")
self.poles = np.array(filtered, dtype=complex)
p_vec = 1j * 2 * np.pi * freqs
residues_samples, d_samples, e_samples = self._solve_residues_given_poles(aligned_S, p_vec)
# 在参数基上再拟合: 由于 Phi 正交, 系数 = Phi^H * values
PhiH = Phi.conj().T # (Q, Ns)
K = self.poles.shape[0]
self.residue_coeffs = np.zeros((nports, nports, K, Qn), dtype=complex)
self.d_coeffs = np.zeros((nports, nports, Qn), dtype=complex)
self.e_coeffs = np.zeros((nports, nports, Qn), dtype=complex)
for i in range(nports):
for j in range(nports):
for k in range(K):
vk = residues_samples[i, j, k, :] # (Ns,)
self.residue_coeffs[i, j, k, :] = PhiH @ vk
self.d_coeffs[i, j, :] = PhiH @ d_samples[i, j, :]
self.e_coeffs[i, j, :] = PhiH @ e_samples[i, j, :]
# 简单误差评估 (训练点重建 RMS)
F_base = self._build_frequency_matrix(p_vec, self.poles)
train_rms = self._compute_training_rms(residues_samples, d_samples, e_samples, F_base, aligned_S)
self.meta['train_rms'] = train_rms
self.meta['nports'] = nports
self.meta['Ns'] = Ns
self.meta['Q'] = Qn
return self
def _compute_training_rms(self, residues_samples, d_samples, e_samples, F_base, original_S_list):
"""计算每个样本的平均端口对 RMS 误差。"""
nports = residues_samples.shape[0]
K = residues_samples.shape[2]
Ns = residues_samples.shape[3]
Nf = F_base.shape[0]
rms = []
for s in range(Ns):
err_sq_sum = 0.0
count = 0
S_ref = original_S_list[s]
for i in range(nports):
for j in range(nports):
x = np.concatenate([
residues_samples[i, j, :, s],
[d_samples[i, j, s], e_samples[i, j, s]]
]) # (K+2,)
y_hat = F_base @ x # (Nf,)
y_ref = S_ref[:, i, j]
err = y_ref - y_hat
err_sq_sum += np.mean(np.abs(err)**2)
count += 1
rms.append(np.sqrt(err_sq_sum / max(count,1)))
return rms
def evaluate(self, W: float, L: float, freqs: np.ndarray | None = None) -> np.ndarray:
"""返回插值后的 S 参数 (Nf, nports, nports)。"""
assert self.frequencies is not None and self.poles is not None
assert self.basis is not None
assert self.residue_coeffs is not None and self.d_coeffs is not None and self.e_coeffs is not None, "Model not fitted"
if freqs is None:
freqs = self.frequencies
# 若请求频率网格不同, 需要重建频率矩阵
p_vec = 1j * 2 * np.pi * freqs
K = len(self.poles)
nports = self.residue_coeffs.shape[0]
phi = self.basis.eval(W, L) # (Q,)
# 预分配
S_eval = np.zeros((len(freqs), nports, nports), dtype=complex)
denom_cache = np.zeros((len(freqs), K), dtype=complex)
for k, a_k in enumerate(self.poles):
denom_cache[:, k] = 1.0 / (p_vec - a_k)
for i in range(nports):
for j in range(nports):
# 恢复参数依赖的残值/常数/斜率
r_k = (self.residue_coeffs[i, j, :, :] @ phi) # (K,)
d = self.d_coeffs[i, j, :] @ phi
e = self.e_coeffs[i, j, :] @ phi
Hf = denom_cache @ r_k + d + p_vec * e # (Nf,)
S_eval[:, i, j] = Hf
return S_eval
def export_s2p(self, W: float, L: float, filepath: str, freqs: np.ndarray | None = None, z0: float | complex = 50) -> str:
"""在给定几何参数 (W,L) 下计算 S 并写入 Touchstone (.sNp) 文件。
参数:
W, L: 几何参数
filepath: 目标文件完整路径, 可包含或不包含 .s2p/.sNp 后缀
freqs: 可选自定义频率数组 (需在训练频率范围内); 默认使用训练频率
z0: 端口参考阻抗 (标量或与端口数相同长度可扩展)
返回:
实际写入的文件路径
"""
assert self.frequencies is not None, "Model not fitted"
S_eval = self.evaluate(W, L, freqs)
if freqs is None:
freqs = self.frequencies
nports = S_eval.shape[1]
freqs_arr = np.asarray(freqs)
# 构建 Network 对象
ntw = rf.Network(f=freqs_arr, s=S_eval, z0=z0)
# 解析输出路径
import os
base_dir = os.path.dirname(filepath) or '.'
os.makedirs(base_dir, exist_ok=True)
base_name = os.path.basename(filepath)
# 去掉可能已有的扩展名, 由 skrf 自动加 .sNp
if '.' in base_name:
base_name = '.'.join(base_name.split('.')[:-1]) or base_name
# 写文件
ntw.write_touchstone(base_name, dir=base_dir)
# 构造最终文件名 (.sNp)
final_path = os.path.join(base_dir, f"{base_name}.s{nports}p")
return final_path
def to_dict(self) -> Dict[str, Any]:
return {
'config': self.cfg.__dict__,
'frequencies': self.frequencies.tolist() if self.frequencies is not None else None,
'poles': self.poles.tolist() if self.poles is not None else None,
'basis': {
'W_mean': getattr(self.basis, 'W_mean', None),
'W_std': getattr(self.basis, 'W_std', None),
'L_mean': getattr(self.basis, 'L_mean', None),
'L_std': getattr(self.basis, 'L_std', None),
'exps': getattr(self.basis, 'exps', None),
'R': (lambda _R: _R.tolist() if _R is not None else None)(getattr(self.basis, 'R', None)),
},
'residue_coeffs': self.residue_coeffs.tolist() if self.residue_coeffs is not None else None,
'd_coeffs': self.d_coeffs.tolist() if self.d_coeffs is not None else None,
'e_coeffs': self.e_coeffs.tolist() if self.e_coeffs is not None else None,
'meta': self.meta,
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'ParametricVectorFitting':
cfg = ParametricVFConfig(**data['config'])
obj = ParametricVectorFitting(cfg)
obj.frequencies = np.array(data['frequencies']) if data['frequencies'] is not None else None
obj.poles = np.array(data['poles']) if data['poles'] is not None else None
# 重建 basis (仅用于 eval); 重新构造 R_inv
if data.get('basis') and data['basis']['R'] is not None:
basis_stub = OrthonormalParamBasis(np.array([0.0, 1.0]), np.array([0.0, 1.0]), 1) # 占位
basis_stub.W_mean = data['basis']['W_mean']
basis_stub.W_std = data['basis']['W_std']
basis_stub.L_mean = data['basis']['L_mean']
basis_stub.L_std = data['basis']['L_std']
basis_stub.exps = [tuple(e) for e in data['basis']['exps']]
R = np.array(data['basis']['R'])
basis_stub.R = R
basis_stub.R_inv = np.linalg.inv(R)
# Q 不需要 (仅用于训练点), 设为 None
basis_stub.Q = None # type: ignore
obj.basis = basis_stub
if data.get('residue_coeffs') is not None:
obj.residue_coeffs = np.array(data['residue_coeffs'])
if data.get('d_coeffs') is not None:
obj.d_coeffs = np.array(data['d_coeffs'])
if data.get('e_coeffs') is not None:
obj.e_coeffs = np.array(data['e_coeffs'])
obj.meta = data.get('meta', {})
return obj
# 划分
def split_dataset(dataset: List[ModelBasicDatasetUnit], train_ratio: float = 0.7, shuffle: bool = True, seed: int = 0) -> Tuple[List[ModelBasicDatasetUnit], List[ModelBasicDatasetUnit]]:
"""划分数据集 70% 训练 / 30% 测试 (默认可改)。"""
ds = list(dataset)
if shuffle:
rng = np.random.default_rng(seed)
rng.shuffle(ds)
n_train = int(len(ds) * train_ratio)
return ds[:n_train], ds[n_train:]
def _interp_s_to_freqs(S: np.ndarray, f_src: np.ndarray, f_tgt: np.ndarray) -> np.ndarray:
"""将 S(f_src) 线性插值到 f_tgt (复数分离实虚部)。"""
if np.array_equal(f_src, f_tgt):
return S
nports = S.shape[1]
S_new = np.zeros((len(f_tgt), nports, nports), dtype=complex)
for i in range(nports):
for j in range(nports):
y = S[:, i, j]
real_part = np.interp(f_tgt, f_src, np.real(y))
imag_part = np.interp(f_tgt, f_src, np.imag(y))
S_new[:, i, j] = real_part + 1j * imag_part
return S_new
def evaluate_dataset(model: ParametricVectorFitting,
subset: List[ModelBasicDatasetUnit],
network_loader=Capa.network) -> Dict[str, Any]:
"""对一个子集评估拟合质量,输出每样本与整体统计."""
assert model.frequencies is not None
freqs = model.frequencies
results = []
mag_err_db_all = []
rms_all = []
for unit in subset:
net = network_loader(unit.result_dir, unit.id)
S_ref = _interp_s_to_freqs(net.s, net.f, freqs) # (Nf, nports, nports)
S_pred = model.evaluate(unit.parameters["W"], unit.parameters["L"], freqs)
err = S_pred - S_ref
rms = float(np.sqrt(np.mean(np.abs(err)**2)))
# 幅度 dB 误差
mag_ref_db = 20*np.log10(np.clip(np.abs(S_ref), 1e-15, None))
mag_pred_db = 20*np.log10(np.clip(np.abs(S_pred), 1e-15, None))
mag_err_db = float(np.mean(np.abs(mag_pred_db - mag_ref_db)))
rms_all.append(rms)
mag_err_db_all.append(mag_err_db)
results.append({
'id': unit.id,
'W': unit.parameters['W'],
'L': unit.parameters['L'],
'rms': rms,
'mag_err_db': mag_err_db
})
summary = {
'count': len(subset),
'rms_mean': float(np.mean(rms_all)) if rms_all else None,
'rms_max': float(np.max(rms_all)) if rms_all else None,
'mag_err_db_mean': float(np.mean(mag_err_db_all)) if mag_err_db_all else None,
'mag_err_db_max': float(np.max(mag_err_db_all)) if mag_err_db_all else None,
'details': results
}
return summary
# 简要使用示例 (非执行):
if __name__ == "__main__":
capa = Capa()
capa.sweep()
print("sweep finished")
capa.export('capa_results.json')
capa.clear()
capa.load('capa_results.json')
dataset = capa.results
# 70% 训练 / 30% 测试
train_set, test_set = split_dataset(dataset, train_ratio=0.7, shuffle=True, seed=42)
print(f"Train: {len(train_set)} Test: {len(test_set)}")
cfg = ParametricVFConfig(n_poles=10, param_total_degree=3, vf_iterations=2,
max_freq_points=50)
pvf = ParametricVectorFitting(cfg).fit(train_set)
train_metrics = evaluate_dataset(pvf, train_set)
test_metrics = evaluate_dataset(pvf, test_set)
pvf.meta['train_eval'] = train_metrics
pvf.meta['test_eval'] = test_metrics
def _print_metrics(title: str, m: Dict[str, Any]):
print(f"[{title}] samples={m['count']}"
f" RMS_mean={m['rms_mean']:.4e} RMS_max={m['rms_max']:.4e}"
f" |Δ|dB_mean={m['mag_err_db_mean']:.3f}dB |Δ|dB_max={m['mag_err_db_max']:.3f}dB")
_print_metrics("TRAIN", train_metrics)
_print_metrics("TEST", test_metrics)
# 示例:导出一个测试点的拟合结果为 s2p
if test_set:
sample = test_set[0]
out_path = pvf.export_s2p(sample.parameters["W"], sample.parameters["L"], filepath='outputs/pvf_test_sample')
print("Exported:", out_path)