From 4e25538a4ab40d299345af504c792593442e46df Mon Sep 17 00:00:00 2001 From: mayge Date: Tue, 30 Sep 2025 10:37:58 -0400 Subject: [PATCH] feat: Add GVF module --- ovf/core/GVFManager.py | 123 ++++++++++++++++++++ ovf/core/VFManager.py | 49 +++++++- ovf/core/basis/MultiPortOrthonormalBasis.py | 7 +- ovf/core/geometry/plot2d.py | 32 +++++ ovf/core/geometry/plot3d.py | 72 ++++++++++++ ovf/core/geometry/plot_poles.py | 88 ++++++++++++++ ovf/core/utils.py | 20 ++++ ovf/schemas/geometry/basic.py | 36 ++++++ ovf/schemas/geometry/poles.py | 30 +++++ 9 files changed, 452 insertions(+), 5 deletions(-) create mode 100644 ovf/core/GVFManager.py create mode 100644 ovf/core/geometry/plot2d.py create mode 100644 ovf/core/geometry/plot3d.py create mode 100644 ovf/core/geometry/plot_poles.py create mode 100644 ovf/schemas/geometry/basic.py create mode 100644 ovf/schemas/geometry/poles.py diff --git a/ovf/core/GVFManager.py b/ovf/core/GVFManager.py new file mode 100644 index 0000000..addcb41 --- /dev/null +++ b/ovf/core/GVFManager.py @@ -0,0 +1,123 @@ +from ovf.core.VFManager import VFManager +from dataclasses import dataclass +from typing import List,Dict,Literal +import numpy as np +import json +import skrf as rf +import os +from ovf.core.sample import auto_select_multple_ports +from ovf.core.basis.MultiPortOrthonormalBasis import MultiPortOrthonormalBasis +from ovf.core.geometry.plot_poles import plot_poles_in_3d +from ovf.schemas.geometry.poles import PolesPlot3dUnit,PolesPlot3dDataUnit +import pickle + +@dataclass +class GVFDataUnit: + geometries: Dict[str,float] + vf_manager:VFManager + enabled:bool=True + +class GVFManager: + def __init__(self): + self.parameter_type: Literal["s","y","z"] = "s" + self.datasets: List[GVFDataUnit] = [] + + def save(self,filepath:str): + os.makedirs(os.path.dirname(filepath),exist_ok=True) + with open(filepath,"wb") as f: + pickle.dump(self,f) + + @classmethod + def load(cls,filepath:str): + with open(filepath,"rb") as f: + obj = pickle.load(f) + assert isinstance(obj,GVFManager),"The loaded object is not a GVFManager instance." + return obj + + def load_from_datasets(self,jsonfile:str,npoles_cplx:int=2,max_iterations:int=5,max_points:int|None=20,basis:type=MultiPortOrthonormalBasis,parameter_type:Literal["s","y","z"]="s"): + self.parameter_type = parameter_type + with open(jsonfile,"r") as f: + datas = json.load(f) + + for i in range(len(datas)): + print(f"Loading dataset {i+1}/{len(datas)+1}") + network = rf.Network(f"{os.path.dirname(jsonfile)}/{datas[i]['result_dir']}") + ports = network.nports + K = max_iterations + + full_freqences = network.f + + if parameter_type == "s": + sampled_points = network.s.reshape(-1,ports,ports) + elif parameter_type == "y": + sampled_points = network.y.reshape(-1,ports,ports) + elif parameter_type == "z": + sampled_points = network.z.reshape(-1,ports,ports) + + if max_points: + H,freqs = auto_select_multple_ports(sampled_points,full_freqences,max_points=max_points) + else: + H,freqs = sampled_points,full_freqences + vf = VFManager(npoles_cplx=npoles_cplx,freqs=freqs,H=H,model=basis,iterations=K,verbose=False) + vf.fit() + + geometries = datas[i]["parameters"] + self.datasets.append(GVFDataUnit(geometries=geometries, vf_manager=vf)) + + def plot_poles(self,save_path,degree:int,geometry_1:str,geometry_2:str): + unit = PolesPlot3dUnit( + title="Poles Distribution", + datas=[ + PolesPlot3dDataUnit( + poles=ds.vf_manager.poles, + geometry_1=ds.geometries[geometry_1], + geometry_2=ds.geometries[geometry_2] + ) for ds in self.datasets + ], + x_scale="linear", + y_scale="linear", + z_scale="linear", + ridge_degree=degree, + ridge_alpha=1.0, + real_part=True, + imag_part=True, + magnitude=True, + phase=False, + pole_label="pole", + geometry_1_label=geometry_1, + geometry_2_label=geometry_2, + pole_scale="linear", + geometry_1_scale="linear", + geometry_2_scale="linear" + ) + + plot_poles_in_3d(unit, save_path) + + def plot_vf_responses_with_index(self,save_dir:str,index:int): + ds = self.datasets[index] + id = f"{index}" + os.makedirs(save_dir,exist_ok=True) + vf = ds.vf_manager + vf.plot_metrics(show=False,save_path=f"{save_dir}/{id}") + full_freqences = vf.freqs + model_responses = vf.get_model_responses(full_freqences) + vf.plot_model_responses(show=False,save_path=f"{save_dir}/{id}") + + def plot_all_vf_responses(self,save_dir:str): + for index,ds in enumerate(self.datasets): + id = f"{index}" + os.makedirs(save_dir,exist_ok=True) + vf = ds.vf_manager + vf.plot_metrics(show=False,save_path=f"{save_dir}/{id}") + full_freqences = vf.freqs + model_responses = vf.get_model_responses(full_freqences) + vf.plot_model_responses(show=False,save_path=f"{save_dir}/{id}") + + def evaluate_dataset(self): + for ds in self.datasets: + rmse = ds.vf_manager.rms_error + condition = ds.vf_manager.condition + row_condition = ds.vf_manager.row_condition + col_condition = ds.vf_manager.col_condition + + diff --git a/ovf/core/VFManager.py b/ovf/core/VFManager.py index 692cf11..d9b3840 100644 --- a/ovf/core/VFManager.py +++ b/ovf/core/VFManager.py @@ -48,6 +48,47 @@ class VFManager(): self.model_responses_H = None self.model=model + + @property + def residuals(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.model_instance.residuals + + @property + def constant(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.model_instance.D + + @property + def proportional(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.model_instance.proportional + + @property + def poles(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return list(self.model_instance.poles) + + @property + def rms_error(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.eigenval_rms_error[-1] + + @property + def condition(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.eigenval_condition[-1] + + @property + def row_condition(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.eigenval_row_condition[-1] + + @property + def col_condition(self): + assert self.model_instance is not None ,"Please run levi() and sk_iteration() first." + return self.eigenval_col_condition[-1] + @classmethod def load(cls,dirname): @@ -146,11 +187,11 @@ class VFManager(): return self.model def levi(self): - self.poles = generate_starting_poles(self.npoles_cplx,beta_min=max(self.freqs[0],1e4),beta_max=self.freqs[-1]*1.1,verbose=self.verbose) + self._poles = generate_starting_poles(self.npoles_cplx,beta_min=max(self.freqs[0],1e4),beta_max=self.freqs[-1]*1.1,verbose=self.verbose) self.model_instance=self.model( H=self.H, freqs=self.freqs, - pre_poles=self.poles, + pre_poles=self._poles, fit_constant=self.fit_constant, fit_proportional=self.fit_proportional, dc_enforce=self.dc_enforce, @@ -161,11 +202,11 @@ class VFManager(): def sk_iteration(self): for i in range(self.iterations): assert self.model_instance is not None ,"Please run levi() first." - self.poles = self.model_instance.poles + self._poles = self.model_instance.poles self.model_instance = self.model( H=self.H, freqs=self.freqs, - pre_poles=self.poles, + pre_poles=self._poles, fit_constant=self.fit_constant, fit_proportional=self.fit_proportional, dc_enforce=self.dc_enforce, diff --git a/ovf/core/basis/MultiPortOrthonormalBasis.py b/ovf/core/basis/MultiPortOrthonormalBasis.py index 3dc8b58..6275083 100644 --- a/ovf/core/basis/MultiPortOrthonormalBasis.py +++ b/ovf/core/basis/MultiPortOrthonormalBasis.py @@ -96,7 +96,12 @@ class MultiPortOrthonormalBasis: """Return condition number and RMS error of eigenvalues of A-BC.""" z = self.poles cond = np.linalg.cond(self.A - self.B @ self.C) - rms = np.sqrt(np.mean(np.abs(np.real(z) - np.real(self.pre_poles))**2 + np.abs(np.imag(z) - np.imag(self.pre_poles))**2)) + # rms = np.sqrt(np.mean(np.abs(np.real(z) - np.real(self.pre_poles))**2 + np.abs(np.imag(z) - np.imag(self.pre_poles))**2)) + + Hk = self.H + Hk_fitted = self.get_model_responses(self.freqs) + + rms = np.sqrt(np.mean(np.abs(Hk - Hk_fitted)**2)) row_cond = cond_row_inf(self.A - self.B @ self.C) col_cond = cond_col_one(self.A - self.B @ self.C) diff --git a/ovf/core/geometry/plot2d.py b/ovf/core/geometry/plot2d.py new file mode 100644 index 0000000..4d825f5 --- /dev/null +++ b/ovf/core/geometry/plot2d.py @@ -0,0 +1,32 @@ +from plotly import graph_objects as go +from plotly.subplots import make_subplots +import numpy as np +from sklearn.linear_model import Ridge +import os +from ovf.schemas.geometry.basic import GeometryPlot2dUnit, GeometryPlot2dComplexDataUnit + +def plot_2d(unit: GeometryPlot2dUnit, dirname: str): + os.makedirs(dirname, exist_ok=True) + + fig = make_subplots(rows=1, cols=1) + + for index in range(len(unit.datas[0].x)): + x = [unit.datas[i].x[index] for i in range(len(unit.datas))] + y = [unit.datas[i].y[index] for i in range(len(unit.datas))] + fig.add_trace( + go.Scatter( + x=x, + y=y, + mode='markers', + name=data.label + ), + row=1, col=1 + ) + + fig.update_layout( + title=unit.title, + xaxis_title=unit.x_label, + yaxis_title=unit.y_label, + ) + + fig.write_html(f"{dirname}/plot_2d.html") \ No newline at end of file diff --git a/ovf/core/geometry/plot3d.py b/ovf/core/geometry/plot3d.py new file mode 100644 index 0000000..0dc1aae --- /dev/null +++ b/ovf/core/geometry/plot3d.py @@ -0,0 +1,72 @@ +from ovf.schemas.geometry.basic import GeometryPlot3dDataUnit +from ovf.schemas.geometry.basic import GeometryPlot3dUnit +import numpy as np +import plotly.graph_objs as go +from sklearn.linear_model import Ridge +from sklearn.preprocessing import PolynomialFeatures +from ovf.core.utils import _generate_ridge_plane + + +def get_plot_instance_in_3d(datas:GeometryPlot3dUnit): + # 显示所有pole所在的点和拟合得到的平面 + def predict(ridge:Ridge, x, y, poly): + features = poly.transform(np.array([[x, y]])) + result = float(ridge.predict(features)[0]) + return result + + poly = PolynomialFeatures(degree=datas.ridge_degree, include_bias=False) + + if datas.x_scale == "log": + datas.x_label = "log10(" + datas.x_label + ")" + if datas.y_scale == "log": + datas.y_label = "log10(" + datas.y_label + ")" + if datas.z_scale == "log": + datas.z_label = "log10(" + datas.z_label + ")" + + pt = [( + np.float64((np.log10(d.x) if d.x > 0 else 0) if datas.x_scale == "log" else np.float64(d.x)), + np.float64((np.log10(d.y) if d.y > 0 else 0) if datas.y_scale == "log" else np.float64(d.y)), + np.float64((np.log10(d.z) if d.z > 0 else 0) if datas.z_scale == "log" else np.float64(d.z)) + ) for d in datas.datas] + pt = np.array(pt) + + model,poly = _generate_ridge_plane([tuple(row) for row in pt], alpha=datas.ridge_alpha, poly=poly) + + # z轴取log10 + trace = go.Scatter3d( + x=pt[:,0], + y=pt[:,1], + z=pt[:,2], + mode='markers', + marker=dict(size=6, colorscale='Viridis', opacity=0.8), + text=[f'{datas.x_label}:{pt[p, 0]}
{datas.y_label}:{pt[p, 1]}
{datas.z_label}:{pt[p, 2]}' for p in range(len(pt))], + hoverinfo='text' + ) + # --- 拟合平面 --- + + X_range = np.linspace(min(pt[:,0]), max(pt[:,0]), 30) + Y_range = np.linspace(min(pt[:,1]), max(pt[:,1]), 30) + Z_range = np.linspace(min(pt[:,2]), max(pt[:,2]), 30) + X_grid, Y_grid = np.meshgrid(X_range, Y_range) + X_flat = X_grid.ravel() + Y_flat = Y_grid.ravel() + Z_flat = np.array([predict(model,x,y,poly) for x, y in zip(X_flat, Y_flat)]) + Z_grid = Z_flat.reshape(X_grid.shape) + surface = go.Surface( + x=X_range, y=Y_range, z=Z_grid, + opacity=0.5, colorscale='Reds', showscale=False, + name='Fitted Plane' + ) + + layout = go.Layout( + title=datas.title, + scene=dict( + xaxis=dict(title=datas.x_label), + yaxis=dict(title=datas.y_label), + zaxis=dict(title=datas.z_label), + ), + showlegend=False + ) + return trace, surface, layout + # fig = go.Figure(data=[trace, surface], layout=layout) + # plot(fig, filename=filename) \ No newline at end of file diff --git a/ovf/core/geometry/plot_poles.py b/ovf/core/geometry/plot_poles.py new file mode 100644 index 0000000..0d6c57c --- /dev/null +++ b/ovf/core/geometry/plot_poles.py @@ -0,0 +1,88 @@ +import plotly as px +import plotly.graph_objs as go +from plotly.offline import plot +import numpy as np +from sklearn.linear_model import Ridge +from ovf.schemas.geometry.poles import PolesPlot3dUnit, PolesPlot3dDataUnit +from ovf.schemas.geometry.basic import GeometryPlot3dDataUnit, GeometryPlot3dUnit +from ovf.core.geometry.plot3d import get_plot_instance_in_3d +from plotly.subplots import make_subplots +import os + +def plot_poles_in_3d(poles:PolesPlot3dUnit,dirname:str): + os.makedirs(dirname,exist_ok=True) + + index = 0 + + while index < len(poles.datas[0].poles): + if poles.real_part: + plotunit = GeometryPlot3dUnit( + title="Real Part", + datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.real(p.poles[index])) for p in poles.datas], + x_label=poles.geometry_1_label, + y_label=poles.geometry_2_label, + z_label=poles.pole_label, + x_scale=poles.geometry_1_scale, + y_scale=poles.geometry_2_scale, + z_scale=poles.pole_scale, + ridge_alpha=poles.ridge_alpha, + ridge_degree=poles.ridge_degree + ) + trace, surface, layout = get_plot_instance_in_3d(plotunit) + fig = go.Figure(data=[trace, surface], layout=layout) + fig.write_html(f"{dirname}/pole_{index}_real.html") + + if poles.imag_part: + plotunit = GeometryPlot3dUnit( + title="Imaginary Part", + datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.imag(p.poles[index])) for p in poles.datas], + x_label=poles.geometry_1_label, + y_label=poles.geometry_2_label, + z_label=poles.pole_label, + x_scale=poles.geometry_1_scale, + y_scale=poles.geometry_2_scale, + z_scale=poles.pole_scale, + ridge_alpha=poles.ridge_alpha + ) + trace, surface, layout = get_plot_instance_in_3d(plotunit) + fig = go.Figure(data=[trace, surface], layout=layout) + fig.write_html(f"{dirname}/pole_{index}_imag.html") + if poles.magnitude: + plotunit = GeometryPlot3dUnit( + title="Magnitude", + datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.abs(p.poles[index])) for p in poles.datas], + x_label=poles.geometry_1_label, + y_label=poles.geometry_2_label, + z_label=poles.pole_label, + x_scale=poles.geometry_1_scale, + y_scale=poles.geometry_2_scale, + z_scale=poles.pole_scale, + ridge_alpha=poles.ridge_alpha + ) + trace, surface, layout = get_plot_instance_in_3d(plotunit) + fig = go.Figure(data=[trace, surface], layout=layout) + fig.write_html(f"{dirname}/pole_{index}_magnitude.html") + if poles.phase: + plotunit = GeometryPlot3dUnit( + title="Phase", + datas=[GeometryPlot3dDataUnit(x=p.geometry_1, y=p.geometry_2, z=np.float64(np.angle(p.poles[index]))) for p in poles.datas], + x_label=poles.geometry_1_label, + y_label=poles.geometry_2_label, + z_label=poles.pole_label, + x_scale=poles.geometry_1_scale, + y_scale=poles.geometry_2_scale, + z_scale=poles.pole_scale, + ridge_alpha=poles.ridge_alpha + ) + trace, surface, layout = get_plot_instance_in_3d(plotunit) + fig = go.Figure(data=[trace, surface], layout=layout) + fig.write_html(f"{dirname}/pole_{index}_phase.html") + + + if index + 1 < len(poles.datas[0].poles): + if np.isclose(poles.datas[0].poles[index] , np.conj(poles.datas[0].poles[index + 1])): + index += 1 # 跳过共轭点 + + index += 1 + + \ No newline at end of file diff --git a/ovf/core/utils.py b/ovf/core/utils.py index 4859462..defc2d5 100644 --- a/ovf/core/utils.py +++ b/ovf/core/utils.py @@ -1,6 +1,26 @@ import numpy as np from dataclasses import dataclass from typing import List, Dict, Tuple +from sklearn.linear_model import Ridge +from sklearn.preprocessing import PolynomialFeatures + +def _generate_ridge_plane(points:List[tuple[np.float64,np.float64,np.float64]],poly:PolynomialFeatures, alpha:float=1.0) -> tuple[Ridge,PolynomialFeatures]: + # 使用Ridge回归拟合平面 + X_poly = poly.fit_transform(np.array([[d[0], d[1]] for d in points])) + model = Ridge(alpha=alpha) + model.fit(X_poly, np.array([d[2] for d in points])) + return model,poly + +def grey_to_rgb(value): + """Convert a grey value (0-255) to an RGB tuple.""" + if not (0 <= value <= 255): + raise ValueError("Value must be between 0 and 255") + # color refers to rainbow scale + ratio = value / 255 + b = int(max(0, 255 * (1 - 2 * ratio))) + r = int(max(0, 255 * (2 * ratio - 1))) + g = 255 - b - r + return (r, g, b) def cond_row_inf(A, use_pinv=True): """行条件数 κ∞(A) = ||A||∞ * ||A^{-1}||∞;矩形阵用广义逆。""" diff --git a/ovf/schemas/geometry/basic.py b/ovf/schemas/geometry/basic.py new file mode 100644 index 0000000..d53a0a8 --- /dev/null +++ b/ovf/schemas/geometry/basic.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import List, Literal +from typing import Dict, Union + +@dataclass +class GeometryPlot3dDataUnit: + x: float + y: float + z: float + +@dataclass +class GeometryPlot3dUnit: + title: str + datas: List[GeometryPlot3dDataUnit] + ridge_alpha: float = 1.0 # Ridge回归的正则化参数 + ridge_degree: int = 2 # Ridge回归的多项式特征的度 + x_label: str = "x" + y_label: str = "y" + z_label: str = "z" + x_scale: Literal["linear", "log"] = "linear" + y_scale: Literal["linear", "log"] = "linear" + z_scale: Literal["linear", "log"] = "linear" + +@dataclass +class GeometryPlot2dComplexDataUnit: + x: List[float] + y: List[float] + geometries: Dict[str,Union[float,int,str]] + + +@dataclass +class GeometryPlot2dUnit: + title: str + datas: List[GeometryPlot2dComplexDataUnit] + x_label: str = "x" + y_label: str = "y" diff --git a/ovf/schemas/geometry/poles.py b/ovf/schemas/geometry/poles.py new file mode 100644 index 0000000..3301596 --- /dev/null +++ b/ovf/schemas/geometry/poles.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass +from typing import List, Literal, Union +import numpy as np + +@dataclass +class PolesPlot3dDataUnit: + poles: List[Union[complex,np.complex128]] + geometry_1: float + geometry_2: float + + +@dataclass +class PolesPlot3dUnit: + title: str + datas: List[PolesPlot3dDataUnit] + x_scale: Literal["linear", "log"] = "linear" + y_scale: Literal["linear", "log"] = "linear" + z_scale: Literal["linear", "log"] = "linear" + ridge_degree: int = 2 # Ridge回归的多项式特征的度 + ridge_alpha: float = 1.0 # Ridge回归的正则化参数 + real_part: bool = True # 是否显示实部 + imag_part: bool = True # 是否显示虚部 + magnitude: bool = True # 是否显示幅值 + phase: bool = False # 是否显示相位 + pole_label: str = "pole" + geometry_1_label: str = "geometry_1" + geometry_2_label: str = "geometry_2" + pole_scale: Literal["linear", "log"] = "linear" + geometry_1_scale: Literal["linear", "log"] = "linear" + geometry_2_scale: Literal["linear", "log"] = "linear" \ No newline at end of file