191 lines
7.4 KiB
Python
191 lines
7.4 KiB
Python
from models.basic import ModelBasic
|
||
from typing import List,Literal,Dict
|
||
import numpy as np
|
||
import random
|
||
from pydantic import BaseModel
|
||
from skrf import Network
|
||
from models.basic import ModelBasicDatasetUnit
|
||
|
||
class RobustParametricConfig(BaseModel):
|
||
n_poles: int
|
||
max_iter: int = 10
|
||
parameter_type: Literal["s","y","z"]
|
||
|
||
class RobustParametricModel:
|
||
def __init__(self,model:ModelBasic,config:RobustParametricConfig):
|
||
self.model = model
|
||
self.config = config
|
||
|
||
# 区分训练集和测试集
|
||
def _train_test_split(self,train_ratio:float=0.8,random_state:int=42):
|
||
random.seed(random_state)
|
||
dataset = self.model.results
|
||
random.shuffle(dataset)
|
||
train_size = int(len(dataset)*train_ratio)
|
||
self.train_set = dataset[:train_size]
|
||
self.test_set = dataset[train_size:]
|
||
return self.train_set, self.test_set
|
||
|
||
def first_iteration_step(self, datasets: List[ModelBasicDatasetUnit],
|
||
param_degree: int = 1):
|
||
'''
|
||
SK 迭代的第一步 (t=0) – 对应论文公式 (3).
|
||
|
||
目标:
|
||
最小化 sum_{样本 n, 频点 f} | N^{(0)}(s_f, g_n) - H(s_f, g_n) |^2
|
||
在 t=0 阶段我们令 D^{(0)}(s,g)=1, 仅拟合分子:
|
||
N^{(0)}(s,g) = Σ_{p∈P} Σ_{v∈V} c_{p,v} * φ_freq_p(s) * φ_param_v(g)
|
||
|
||
因此是一个纯线性最小二乘:
|
||
H ≈ Σ_{p,v} c_{p,v} F[:,p] ⊗ G[n,v]
|
||
|
||
记:
|
||
F: (Nf, P) 频率正交(或原始)基列
|
||
G: (Ns, V) 参数多项式(含常数)基列
|
||
设计矩阵 A 大小 (Ns*Nf, P*V), A[(n*Nf + f), (p*V + v)] = F[f,p] * G[n,v]
|
||
|
||
输出:
|
||
self.first_iter_coeffs[(i,j)] = C_{p,v} (P × V) 每个端口对一套系数
|
||
self.freq_basis_F = F
|
||
self.param_basis_G = G
|
||
self.poles 初始极点 (供后续构造有理正交基 / SK 加权使用)
|
||
|
||
参数:
|
||
datasets: 训练用样本列表
|
||
param_degree: 参数多项式最大总次数 (默认 1 → 常数 + 线性)
|
||
|
||
注意:
|
||
这里使用简单频率基 [1, 1/(s-a_k)],未做 QR 正交化;
|
||
后续 SK 迭代 / 正交化可在第二步再进行。
|
||
'''
|
||
assert len(datasets) > 0, "空数据集"
|
||
# ---------------- 收集频率与端口信息 ----------------
|
||
Ns = len(datasets)
|
||
freqs = datasets[0].freqs # 频率需要对齐
|
||
Nf = freqs.shape[0]
|
||
nports = self.model.info.nports
|
||
|
||
# ---------------- 构造初始极点 (对数分布 + 阻尼) ----------------
|
||
def _init_poles(n_poles: int):
|
||
fmin, fmax = freqs[0], freqs[-1]
|
||
if n_poles <= 0:
|
||
return np.array([], dtype=complex)
|
||
# 避免 0 Hz,用第二个点或微小偏移
|
||
start_f = max(fmin if fmin > 0 else freqs[1] if Nf > 1 else 1.0e-3, 1e-12)
|
||
f_samples = np.logspace(np.log10(start_f), np.log10(fmax), n_poles)
|
||
sigma = 0.02 * 2 * np.pi * fmax # 固定阻尼,可放入 config
|
||
return -sigma + 1j * 2 * np.pi * f_samples
|
||
|
||
self.poles = _init_poles(self.config.n_poles)
|
||
s_vec = 1j * 2 * np.pi * freqs # (Nf,)
|
||
|
||
# ---------------- 频率基 F ----------------
|
||
# 列0: 常数 1, 后续列: 1/(s - a_k)
|
||
P = 1 + len(self.poles)
|
||
F = np.zeros((Nf, P), dtype=complex)
|
||
F[:, 0] = 1.0
|
||
for k, a in enumerate(self.poles, start=1):
|
||
F[:, k] = 1.0 / (s_vec - a)
|
||
self.freq_basis_F = F # (Nf, P)
|
||
|
||
# ---------------- 参数基 G ----------------
|
||
# 取出参数向量 g = [param1, param2, ...]
|
||
# 假设 datasets[i].parameters 为 dict
|
||
param_keys = list(datasets[0].parameters.keys())
|
||
d = len(param_keys)
|
||
# 构造参数矩阵 (Ns,d)
|
||
Pmat = np.zeros((Ns, d), dtype=float)
|
||
for i, unit in enumerate(datasets):
|
||
for j, k in enumerate(param_keys):
|
||
Pmat[i, j] = float(unit.parameters[k])
|
||
# 标准化
|
||
mean = Pmat.mean(axis=0)
|
||
std = Pmat.std(axis=0) + 1e-15
|
||
Xn = (Pmat - mean) / std
|
||
|
||
# 生成多项式指数 (总次数 <= param_degree)
|
||
def _gen_param_exps(dim, D):
|
||
exps = []
|
||
def rec(cur, idx, rem):
|
||
if idx == dim:
|
||
exps.append(tuple(cur)); return
|
||
for t in range(rem + 1):
|
||
cur.append(t); rec(cur, idx + 1, rem - t); cur.pop()
|
||
rec([], 0, D)
|
||
return exps
|
||
|
||
exps = _gen_param_exps(d, param_degree) # V_exps
|
||
V = len(exps)
|
||
G = np.zeros((Ns, V), dtype=float)
|
||
for vidx, e in enumerate(exps):
|
||
val = np.ones(Ns)
|
||
for j, p in enumerate(e):
|
||
if p:
|
||
val *= Xn[:, j] ** p
|
||
G[:, vidx] = val
|
||
self.param_basis_G = G # (Ns, V)
|
||
self.param_basis_meta = {
|
||
"keys": param_keys,
|
||
"mean": mean.tolist(),
|
||
"std": std.tolist(),
|
||
"exponents": exps,
|
||
"degree": param_degree
|
||
}
|
||
|
||
# ---------------- 构造设计矩阵 A (Kronecker) ----------------
|
||
# A shape: (Ns*Nf, P*V)
|
||
# 利用广播:A_block[n,f,p,v] = F[f,p] * G[n,v]
|
||
A4 = np.einsum('fp,nv->nfpv', F, G) # (Ns, Nf, P, V)
|
||
A = A4.reshape(Ns * Nf, P * V)
|
||
|
||
# ---------------- 采集目标 H 数据 ----------------
|
||
# 对每个端口对 (i,j) 分别解一个向量 c_{p,v}
|
||
# 选择参数类型
|
||
param_type = self.config.parameter_type.lower()
|
||
valid_types = {"s", "y", "z"}
|
||
if param_type not in valid_types:
|
||
raise ValueError(f"parameter_type 必须在 {valid_types}")
|
||
|
||
# 预先载入全部 Network (避免重复 IO)
|
||
# H_data[(i,j)] -> (Ns,Nf) 复数
|
||
H_data = {}
|
||
for i_port in range(nports):
|
||
for j_port in range(nports):
|
||
H_mat = np.zeros((Ns, Nf), dtype=complex)
|
||
for n, unit in enumerate(datasets):
|
||
net: Network = unit.network
|
||
if param_type == "s":
|
||
sij = net.s[:, i_port, j_port]
|
||
elif param_type == "y":
|
||
sij = net.y[:, i_port, j_port]
|
||
else:
|
||
sij = net.z[:, i_port, j_port]
|
||
# 插值或对齐假设已完成
|
||
H_mat[n, :] = sij
|
||
H_data[(i_port, j_port)] = H_mat
|
||
|
||
# ---------------- 最小二乘求系数 ----------------
|
||
self.first_iter_coeffs = {}
|
||
# 可选: 预计算 A^+ (伪逆) 如果数据不巨大
|
||
# pinv = np.linalg.pinv(A)
|
||
for key, Hmat in H_data.items():
|
||
b = Hmat.reshape(Ns * Nf) # (Ns*Nf,)
|
||
# 解 x (长度 P*V)
|
||
x, *_ = np.linalg.lstsq(A, b, rcond=None)
|
||
C = x.reshape(P, V)
|
||
self.first_iter_coeffs[key] = C
|
||
|
||
# ---------------- 存储便于后续 SK 迭代使用 ----------------
|
||
self.meta_first_iter = {
|
||
"A_shape": A.shape,
|
||
"num_samples": Ns,
|
||
"num_freqs": Nf,
|
||
"freq_basis_dim": P,
|
||
"param_basis_dim": V
|
||
}
|
||
if getattr(self.config, "verbose", True):
|
||
print(f"[t=0] 线性最小二乘完成: A={A.shape}, 频率基P={P}, 参数基V={V}, 端口对={nports*nports}")
|
||
|
||
return self.first_iter_coeffs
|
||
|
||
|