Files
ovf/core/robust.py
2025-09-15 11:41:55 -04:00

191 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from models.basic import ModelBasic
from typing import List,Literal,Dict
import numpy as np
import random
from pydantic import BaseModel
from skrf import Network
from models.basic import ModelBasicDatasetUnit
class RobustParametricConfig(BaseModel):
n_poles: int
max_iter: int = 10
parameter_type: Literal["s","y","z"]
class RobustParametricModel:
def __init__(self,model:ModelBasic,config:RobustParametricConfig):
self.model = model
self.config = config
# 区分训练集和测试集
def _train_test_split(self,train_ratio:float=0.8,random_state:int=42):
random.seed(random_state)
dataset = self.model.results
random.shuffle(dataset)
train_size = int(len(dataset)*train_ratio)
self.train_set = dataset[:train_size]
self.test_set = dataset[train_size:]
return self.train_set, self.test_set
def first_iteration_step(self, datasets: List[ModelBasicDatasetUnit],
param_degree: int = 1):
'''
SK 迭代的第一步 (t=0) 对应论文公式 (3).
目标:
最小化 sum_{样本 n, 频点 f} | N^{(0)}(s_f, g_n) - H(s_f, g_n) |^2
在 t=0 阶段我们令 D^{(0)}(s,g)=1, 仅拟合分子:
N^{(0)}(s,g) = Σ_{p∈P} Σ_{v∈V} c_{p,v} * φ_freq_p(s) * φ_param_v(g)
因此是一个纯线性最小二乘:
H ≈ Σ_{p,v} c_{p,v} F[:,p] ⊗ G[n,v]
记:
F: (Nf, P) 频率正交(或原始)基列
G: (Ns, V) 参数多项式(含常数)基列
设计矩阵 A 大小 (Ns*Nf, P*V), A[(n*Nf + f), (p*V + v)] = F[f,p] * G[n,v]
输出:
self.first_iter_coeffs[(i,j)] = C_{p,v} (P × V) 每个端口对一套系数
self.freq_basis_F = F
self.param_basis_G = G
self.poles 初始极点 (供后续构造有理正交基 / SK 加权使用)
参数:
datasets: 训练用样本列表
param_degree: 参数多项式最大总次数 (默认 1 → 常数 + 线性)
注意:
这里使用简单频率基 [1, 1/(s-a_k)],未做 QR 正交化;
后续 SK 迭代 / 正交化可在第二步再进行。
'''
assert len(datasets) > 0, "空数据集"
# ---------------- 收集频率与端口信息 ----------------
Ns = len(datasets)
freqs = datasets[0].freqs # 频率需要对齐
Nf = freqs.shape[0]
nports = self.model.info.nports
# ---------------- 构造初始极点 (对数分布 + 阻尼) ----------------
def _init_poles(n_poles: int):
fmin, fmax = freqs[0], freqs[-1]
if n_poles <= 0:
return np.array([], dtype=complex)
# 避免 0 Hz用第二个点或微小偏移
start_f = max(fmin if fmin > 0 else freqs[1] if Nf > 1 else 1.0e-3, 1e-12)
f_samples = np.logspace(np.log10(start_f), np.log10(fmax), n_poles)
sigma = 0.02 * 2 * np.pi * fmax # 固定阻尼,可放入 config
return -sigma + 1j * 2 * np.pi * f_samples
self.poles = _init_poles(self.config.n_poles)
s_vec = 1j * 2 * np.pi * freqs # (Nf,)
# ---------------- 频率基 F ----------------
# 列0: 常数 1, 后续列: 1/(s - a_k)
P = 1 + len(self.poles)
F = np.zeros((Nf, P), dtype=complex)
F[:, 0] = 1.0
for k, a in enumerate(self.poles, start=1):
F[:, k] = 1.0 / (s_vec - a)
self.freq_basis_F = F # (Nf, P)
# ---------------- 参数基 G ----------------
# 取出参数向量 g = [param1, param2, ...]
# 假设 datasets[i].parameters 为 dict
param_keys = list(datasets[0].parameters.keys())
d = len(param_keys)
# 构造参数矩阵 (Ns,d)
Pmat = np.zeros((Ns, d), dtype=float)
for i, unit in enumerate(datasets):
for j, k in enumerate(param_keys):
Pmat[i, j] = float(unit.parameters[k])
# 标准化
mean = Pmat.mean(axis=0)
std = Pmat.std(axis=0) + 1e-15
Xn = (Pmat - mean) / std
# 生成多项式指数 (总次数 <= param_degree)
def _gen_param_exps(dim, D):
exps = []
def rec(cur, idx, rem):
if idx == dim:
exps.append(tuple(cur)); return
for t in range(rem + 1):
cur.append(t); rec(cur, idx + 1, rem - t); cur.pop()
rec([], 0, D)
return exps
exps = _gen_param_exps(d, param_degree) # V_exps
V = len(exps)
G = np.zeros((Ns, V), dtype=float)
for vidx, e in enumerate(exps):
val = np.ones(Ns)
for j, p in enumerate(e):
if p:
val *= Xn[:, j] ** p
G[:, vidx] = val
self.param_basis_G = G # (Ns, V)
self.param_basis_meta = {
"keys": param_keys,
"mean": mean.tolist(),
"std": std.tolist(),
"exponents": exps,
"degree": param_degree
}
# ---------------- 构造设计矩阵 A (Kronecker) ----------------
# A shape: (Ns*Nf, P*V)
# 利用广播A_block[n,f,p,v] = F[f,p] * G[n,v]
A4 = np.einsum('fp,nv->nfpv', F, G) # (Ns, Nf, P, V)
A = A4.reshape(Ns * Nf, P * V)
# ---------------- 采集目标 H 数据 ----------------
# 对每个端口对 (i,j) 分别解一个向量 c_{p,v}
# 选择参数类型
param_type = self.config.parameter_type.lower()
valid_types = {"s", "y", "z"}
if param_type not in valid_types:
raise ValueError(f"parameter_type 必须在 {valid_types}")
# 预先载入全部 Network (避免重复 IO)
# H_data[(i,j)] -> (Ns,Nf) 复数
H_data = {}
for i_port in range(nports):
for j_port in range(nports):
H_mat = np.zeros((Ns, Nf), dtype=complex)
for n, unit in enumerate(datasets):
net: Network = unit.network
if param_type == "s":
sij = net.s[:, i_port, j_port]
elif param_type == "y":
sij = net.y[:, i_port, j_port]
else:
sij = net.z[:, i_port, j_port]
# 插值或对齐假设已完成
H_mat[n, :] = sij
H_data[(i_port, j_port)] = H_mat
# ---------------- 最小二乘求系数 ----------------
self.first_iter_coeffs = {}
# 可选: 预计算 A^+ (伪逆) 如果数据不巨大
# pinv = np.linalg.pinv(A)
for key, Hmat in H_data.items():
b = Hmat.reshape(Ns * Nf) # (Ns*Nf,)
# 解 x (长度 P*V)
x, *_ = np.linalg.lstsq(A, b, rcond=None)
C = x.reshape(P, V)
self.first_iter_coeffs[key] = C
# ---------------- 存储便于后续 SK 迭代使用 ----------------
self.meta_first_iter = {
"A_shape": A.shape,
"num_samples": Ns,
"num_freqs": Nf,
"freq_basis_dim": P,
"param_basis_dim": V
}
if getattr(self.config, "verbose", True):
print(f"[t=0] 线性最小二乘完成: A={A.shape}, 频率基P={P}, 参数基V={V}, 端口对={nports*nports}")
return self.first_iter_coeffs