feat: Add GVF module

This commit is contained in:
mayge
2025-09-30 10:37:58 -04:00
parent de7d1fb473
commit 4e25538a4a
9 changed files with 452 additions and 5 deletions

123
ovf/core/GVFManager.py Normal file
View File

@@ -0,0 +1,123 @@
from ovf.core.VFManager import VFManager
from dataclasses import dataclass
from typing import List,Dict,Literal
import numpy as np
import json
import skrf as rf
import os
from ovf.core.sample import auto_select_multple_ports
from ovf.core.basis.MultiPortOrthonormalBasis import MultiPortOrthonormalBasis
from ovf.core.geometry.plot_poles import plot_poles_in_3d
from ovf.schemas.geometry.poles import PolesPlot3dUnit,PolesPlot3dDataUnit
import pickle
@dataclass
class GVFDataUnit:
geometries: Dict[str,float]
vf_manager:VFManager
enabled:bool=True
class GVFManager:
def __init__(self):
self.parameter_type: Literal["s","y","z"] = "s"
self.datasets: List[GVFDataUnit] = []
def save(self,filepath:str):
os.makedirs(os.path.dirname(filepath),exist_ok=True)
with open(filepath,"wb") as f:
pickle.dump(self,f)
@classmethod
def load(cls,filepath:str):
with open(filepath,"rb") as f:
obj = pickle.load(f)
assert isinstance(obj,GVFManager),"The loaded object is not a GVFManager instance."
return obj
def load_from_datasets(self,jsonfile:str,npoles_cplx:int=2,max_iterations:int=5,max_points:int|None=20,basis:type=MultiPortOrthonormalBasis,parameter_type:Literal["s","y","z"]="s"):
self.parameter_type = parameter_type
with open(jsonfile,"r") as f:
datas = json.load(f)
for i in range(len(datas)):
print(f"Loading dataset {i+1}/{len(datas)+1}")
network = rf.Network(f"{os.path.dirname(jsonfile)}/{datas[i]['result_dir']}")
ports = network.nports
K = max_iterations
full_freqences = network.f
if parameter_type == "s":
sampled_points = network.s.reshape(-1,ports,ports)
elif parameter_type == "y":
sampled_points = network.y.reshape(-1,ports,ports)
elif parameter_type == "z":
sampled_points = network.z.reshape(-1,ports,ports)
if max_points:
H,freqs = auto_select_multple_ports(sampled_points,full_freqences,max_points=max_points)
else:
H,freqs = sampled_points,full_freqences
vf = VFManager(npoles_cplx=npoles_cplx,freqs=freqs,H=H,model=basis,iterations=K,verbose=False)
vf.fit()
geometries = datas[i]["parameters"]
self.datasets.append(GVFDataUnit(geometries=geometries, vf_manager=vf))
def plot_poles(self,save_path,degree:int,geometry_1:str,geometry_2:str):
unit = PolesPlot3dUnit(
title="Poles Distribution",
datas=[
PolesPlot3dDataUnit(
poles=ds.vf_manager.poles,
geometry_1=ds.geometries[geometry_1],
geometry_2=ds.geometries[geometry_2]
) for ds in self.datasets
],
x_scale="linear",
y_scale="linear",
z_scale="linear",
ridge_degree=degree,
ridge_alpha=1.0,
real_part=True,
imag_part=True,
magnitude=True,
phase=False,
pole_label="pole",
geometry_1_label=geometry_1,
geometry_2_label=geometry_2,
pole_scale="linear",
geometry_1_scale="linear",
geometry_2_scale="linear"
)
plot_poles_in_3d(unit, save_path)
def plot_vf_responses_with_index(self,save_dir:str,index:int):
ds = self.datasets[index]
id = f"{index}"
os.makedirs(save_dir,exist_ok=True)
vf = ds.vf_manager
vf.plot_metrics(show=False,save_path=f"{save_dir}/{id}")
full_freqences = vf.freqs
model_responses = vf.get_model_responses(full_freqences)
vf.plot_model_responses(show=False,save_path=f"{save_dir}/{id}")
def plot_all_vf_responses(self,save_dir:str):
for index,ds in enumerate(self.datasets):
id = f"{index}"
os.makedirs(save_dir,exist_ok=True)
vf = ds.vf_manager
vf.plot_metrics(show=False,save_path=f"{save_dir}/{id}")
full_freqences = vf.freqs
model_responses = vf.get_model_responses(full_freqences)
vf.plot_model_responses(show=False,save_path=f"{save_dir}/{id}")
def evaluate_dataset(self):
for ds in self.datasets:
rmse = ds.vf_manager.rms_error
condition = ds.vf_manager.condition
row_condition = ds.vf_manager.row_condition
col_condition = ds.vf_manager.col_condition

View File

@@ -48,6 +48,47 @@ class VFManager():
self.model_responses_H = None
self.model=model
@property
def residuals(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.model_instance.residuals
@property
def constant(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.model_instance.D
@property
def proportional(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.model_instance.proportional
@property
def poles(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return list(self.model_instance.poles)
@property
def rms_error(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.eigenval_rms_error[-1]
@property
def condition(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.eigenval_condition[-1]
@property
def row_condition(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.eigenval_row_condition[-1]
@property
def col_condition(self):
assert self.model_instance is not None ,"Please run levi() and sk_iteration() first."
return self.eigenval_col_condition[-1]
@classmethod
def load(cls,dirname):
@@ -146,11 +187,11 @@ class VFManager():
return self.model
def levi(self):
self.poles = generate_starting_poles(self.npoles_cplx,beta_min=max(self.freqs[0],1e4),beta_max=self.freqs[-1]*1.1,verbose=self.verbose)
self._poles = generate_starting_poles(self.npoles_cplx,beta_min=max(self.freqs[0],1e4),beta_max=self.freqs[-1]*1.1,verbose=self.verbose)
self.model_instance=self.model(
H=self.H,
freqs=self.freqs,
pre_poles=self.poles,
pre_poles=self._poles,
fit_constant=self.fit_constant,
fit_proportional=self.fit_proportional,
dc_enforce=self.dc_enforce,
@@ -161,11 +202,11 @@ class VFManager():
def sk_iteration(self):
for i in range(self.iterations):
assert self.model_instance is not None ,"Please run levi() first."
self.poles = self.model_instance.poles
self._poles = self.model_instance.poles
self.model_instance = self.model(
H=self.H,
freqs=self.freqs,
pre_poles=self.poles,
pre_poles=self._poles,
fit_constant=self.fit_constant,
fit_proportional=self.fit_proportional,
dc_enforce=self.dc_enforce,

View File

@@ -96,7 +96,12 @@ class MultiPortOrthonormalBasis:
"""Return condition number and RMS error of eigenvalues of A-BC."""
z = self.poles
cond = np.linalg.cond(self.A - self.B @ self.C)
rms = np.sqrt(np.mean(np.abs(np.real(z) - np.real(self.pre_poles))**2 + np.abs(np.imag(z) - np.imag(self.pre_poles))**2))
# rms = np.sqrt(np.mean(np.abs(np.real(z) - np.real(self.pre_poles))**2 + np.abs(np.imag(z) - np.imag(self.pre_poles))**2))
Hk = self.H
Hk_fitted = self.get_model_responses(self.freqs)
rms = np.sqrt(np.mean(np.abs(Hk - Hk_fitted)**2))
row_cond = cond_row_inf(self.A - self.B @ self.C)
col_cond = cond_col_one(self.A - self.B @ self.C)

View File

@@ -0,0 +1,32 @@
from plotly import graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
from sklearn.linear_model import Ridge
import os
from ovf.schemas.geometry.basic import GeometryPlot2dUnit, GeometryPlot2dComplexDataUnit
def plot_2d(unit: GeometryPlot2dUnit, dirname: str):
os.makedirs(dirname, exist_ok=True)
fig = make_subplots(rows=1, cols=1)
for index in range(len(unit.datas[0].x)):
x = [unit.datas[i].x[index] for i in range(len(unit.datas))]
y = [unit.datas[i].y[index] for i in range(len(unit.datas))]
fig.add_trace(
go.Scatter(
x=x,
y=y,
mode='markers',
name=data.label
),
row=1, col=1
)
fig.update_layout(
title=unit.title,
xaxis_title=unit.x_label,
yaxis_title=unit.y_label,
)
fig.write_html(f"{dirname}/plot_2d.html")

View File

@@ -0,0 +1,72 @@
from ovf.schemas.geometry.basic import GeometryPlot3dDataUnit
from ovf.schemas.geometry.basic import GeometryPlot3dUnit
import numpy as np
import plotly.graph_objs as go
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
from ovf.core.utils import _generate_ridge_plane
def get_plot_instance_in_3d(datas:GeometryPlot3dUnit):
# 显示所有pole所在的点和拟合得到的平面
def predict(ridge:Ridge, x, y, poly):
features = poly.transform(np.array([[x, y]]))
result = float(ridge.predict(features)[0])
return result
poly = PolynomialFeatures(degree=datas.ridge_degree, include_bias=False)
if datas.x_scale == "log":
datas.x_label = "log10(" + datas.x_label + ")"
if datas.y_scale == "log":
datas.y_label = "log10(" + datas.y_label + ")"
if datas.z_scale == "log":
datas.z_label = "log10(" + datas.z_label + ")"
pt = [(
np.float64((np.log10(d.x) if d.x > 0 else 0) if datas.x_scale == "log" else np.float64(d.x)),
np.float64((np.log10(d.y) if d.y > 0 else 0) if datas.y_scale == "log" else np.float64(d.y)),
np.float64((np.log10(d.z) if d.z > 0 else 0) if datas.z_scale == "log" else np.float64(d.z))
) for d in datas.datas]
pt = np.array(pt)
model,poly = _generate_ridge_plane([tuple(row) for row in pt], alpha=datas.ridge_alpha, poly=poly)
# z轴取log10
trace = go.Scatter3d(
x=pt[:,0],
y=pt[:,1],
z=pt[:,2],
mode='markers',
marker=dict(size=6, colorscale='Viridis', opacity=0.8),
text=[f'{datas.x_label}:{pt[p, 0]}<br>{datas.y_label}:{pt[p, 1]}<br>{datas.z_label}:{pt[p, 2]}' for p in range(len(pt))],
hoverinfo='text'
)
# --- 拟合平面 ---
X_range = np.linspace(min(pt[:,0]), max(pt[:,0]), 30)
Y_range = np.linspace(min(pt[:,1]), max(pt[:,1]), 30)
Z_range = np.linspace(min(pt[:,2]), max(pt[:,2]), 30)
X_grid, Y_grid = np.meshgrid(X_range, Y_range)
X_flat = X_grid.ravel()
Y_flat = Y_grid.ravel()
Z_flat = np.array([predict(model,x,y,poly) for x, y in zip(X_flat, Y_flat)])
Z_grid = Z_flat.reshape(X_grid.shape)
surface = go.Surface(
x=X_range, y=Y_range, z=Z_grid,
opacity=0.5, colorscale='Reds', showscale=False,
name='Fitted Plane'
)
layout = go.Layout(
title=datas.title,
scene=dict(
xaxis=dict(title=datas.x_label),
yaxis=dict(title=datas.y_label),
zaxis=dict(title=datas.z_label),
),
showlegend=False
)
return trace, surface, layout
# fig = go.Figure(data=[trace, surface], layout=layout)
# plot(fig, filename=filename)

View File

@@ -0,0 +1,88 @@
import plotly as px
import plotly.graph_objs as go
from plotly.offline import plot
import numpy as np
from sklearn.linear_model import Ridge
from ovf.schemas.geometry.poles import PolesPlot3dUnit, PolesPlot3dDataUnit
from ovf.schemas.geometry.basic import GeometryPlot3dDataUnit, GeometryPlot3dUnit
from ovf.core.geometry.plot3d import get_plot_instance_in_3d
from plotly.subplots import make_subplots
import os
def plot_poles_in_3d(poles:PolesPlot3dUnit,dirname:str):
os.makedirs(dirname,exist_ok=True)
index = 0
while index < len(poles.datas[0].poles):
if poles.real_part:
plotunit = GeometryPlot3dUnit(
title="Real Part",
datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.real(p.poles[index])) for p in poles.datas],
x_label=poles.geometry_1_label,
y_label=poles.geometry_2_label,
z_label=poles.pole_label,
x_scale=poles.geometry_1_scale,
y_scale=poles.geometry_2_scale,
z_scale=poles.pole_scale,
ridge_alpha=poles.ridge_alpha,
ridge_degree=poles.ridge_degree
)
trace, surface, layout = get_plot_instance_in_3d(plotunit)
fig = go.Figure(data=[trace, surface], layout=layout)
fig.write_html(f"{dirname}/pole_{index}_real.html")
if poles.imag_part:
plotunit = GeometryPlot3dUnit(
title="Imaginary Part",
datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.imag(p.poles[index])) for p in poles.datas],
x_label=poles.geometry_1_label,
y_label=poles.geometry_2_label,
z_label=poles.pole_label,
x_scale=poles.geometry_1_scale,
y_scale=poles.geometry_2_scale,
z_scale=poles.pole_scale,
ridge_alpha=poles.ridge_alpha
)
trace, surface, layout = get_plot_instance_in_3d(plotunit)
fig = go.Figure(data=[trace, surface], layout=layout)
fig.write_html(f"{dirname}/pole_{index}_imag.html")
if poles.magnitude:
plotunit = GeometryPlot3dUnit(
title="Magnitude",
datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.abs(p.poles[index])) for p in poles.datas],
x_label=poles.geometry_1_label,
y_label=poles.geometry_2_label,
z_label=poles.pole_label,
x_scale=poles.geometry_1_scale,
y_scale=poles.geometry_2_scale,
z_scale=poles.pole_scale,
ridge_alpha=poles.ridge_alpha
)
trace, surface, layout = get_plot_instance_in_3d(plotunit)
fig = go.Figure(data=[trace, surface], layout=layout)
fig.write_html(f"{dirname}/pole_{index}_magnitude.html")
if poles.phase:
plotunit = GeometryPlot3dUnit(
title="Phase",
datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.float64(np.angle(p.poles[index]))) for p in poles.datas],
x_label=poles.geometry_1_label,
y_label=poles.geometry_2_label,
z_label=poles.pole_label,
x_scale=poles.geometry_1_scale,
y_scale=poles.geometry_2_scale,
z_scale=poles.pole_scale,
ridge_alpha=poles.ridge_alpha
)
trace, surface, layout = get_plot_instance_in_3d(plotunit)
fig = go.Figure(data=[trace, surface], layout=layout)
fig.write_html(f"{dirname}/pole_{index}_phase.html")
if index + 1 < len(poles.datas[0].poles):
if np.isclose(poles.datas[0].poles[index] , np.conj(poles.datas[0].poles[index + 1])):
index += 1 # 跳过共轭点
index += 1

View File

@@ -1,6 +1,26 @@
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Tuple
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
def _generate_ridge_plane(points:List[tuple[np.float64,np.float64,np.float64]],poly:PolynomialFeatures, alpha:float=1.0) -> tuple[Ridge,PolynomialFeatures]:
# 使用Ridge回归拟合平面
X_poly = poly.fit_transform(np.array([[d[0], d[1]] for d in points]))
model = Ridge(alpha=alpha)
model.fit(X_poly, np.array([d[2] for d in points]))
return model,poly
def grey_to_rgb(value):
"""Convert a grey value (0-255) to an RGB tuple."""
if not (0 <= value <= 255):
raise ValueError("Value must be between 0 and 255")
# color refers to rainbow scale
ratio = value / 255
b = int(max(0, 255 * (1 - 2 * ratio)))
r = int(max(0, 255 * (2 * ratio - 1)))
g = 255 - b - r
return (r, g, b)
def cond_row_inf(A, use_pinv=True):
"""行条件数 κ∞(A) = ||A||∞ * ||A^{-1}||∞;矩形阵用广义逆。"""

View File

@@ -0,0 +1,36 @@
from dataclasses import dataclass
from typing import List, Literal
from typing import Dict, Union
@dataclass
class GeometryPlot3dDataUnit:
x: float
y: float
z: float
@dataclass
class GeometryPlot3dUnit:
title: str
datas: List[GeometryPlot3dDataUnit]
ridge_alpha: float = 1.0 # Ridge回归的正则化参数
ridge_degree: int = 2 # Ridge回归的多项式特征的度
x_label: str = "x"
y_label: str = "y"
z_label: str = "z"
x_scale: Literal["linear", "log"] = "linear"
y_scale: Literal["linear", "log"] = "linear"
z_scale: Literal["linear", "log"] = "linear"
@dataclass
class GeometryPlot2dComplexDataUnit:
x: List[float]
y: List[float]
geometries: Dict[str,Union[float,int,str]]
@dataclass
class GeometryPlot2dUnit:
title: str
datas: List[GeometryPlot2dComplexDataUnit]
x_label: str = "x"
y_label: str = "y"

View File

@@ -0,0 +1,30 @@
from dataclasses import dataclass
from typing import List, Literal, Union
import numpy as np
@dataclass
class PolesPlot3dDataUnit:
poles: List[Union[complex,np.complex128]]
geometry_1: float
geometry_2: float
@dataclass
class PolesPlot3dUnit:
title: str
datas: List[PolesPlot3dDataUnit]
x_scale: Literal["linear", "log"] = "linear"
y_scale: Literal["linear", "log"] = "linear"
z_scale: Literal["linear", "log"] = "linear"
ridge_degree: int = 2 # Ridge回归的多项式特征的度
ridge_alpha: float = 1.0 # Ridge回归的正则化参数
real_part: bool = True # 是否显示实部
imag_part: bool = True # 是否显示虚部
magnitude: bool = True # 是否显示幅值
phase: bool = False # 是否显示相位
pole_label: str = "pole"
geometry_1_label: str = "geometry_1"
geometry_2_label: str = "geometry_2"
pole_scale: Literal["linear", "log"] = "linear"
geometry_1_scale: Literal["linear", "log"] = "linear"
geometry_2_scale: Literal["linear", "log"] = "linear"