feat: 多端口权重改善和修改了采集函数
This commit is contained in:
@@ -8,7 +8,7 @@ import matplotlib.pyplot as plt
|
||||
import random as rnd
|
||||
|
||||
class MultiplePortQR:
|
||||
def __init__(self,H,freqs,poles,weights=None,passivity=True,dc_enforce=True,fit_constant=True,fit_proportional=False):
|
||||
def __init__(self,H,freqs,poles,weights=None,passivity=True,dc_enforce=False,fit_constant=True,fit_proportional=False):
|
||||
self.least_squares_rms_error = None
|
||||
self.least_squares_condition = None
|
||||
self.eigenval_condition = None
|
||||
@@ -37,7 +37,7 @@ class MultiplePortQR:
|
||||
self.Cr = None
|
||||
|
||||
z = np.linalg.eigvals(self.A - self.B @ self.Cw)
|
||||
p_next = -z
|
||||
p_next = z
|
||||
|
||||
if passivity:
|
||||
self.next_poles = self.passivity_enforce(p_next)
|
||||
@@ -196,21 +196,28 @@ class MultiplePortQR:
|
||||
A_blocks = [A_re, A_im]
|
||||
|
||||
if self.fit_constant:
|
||||
Hk_kp = None
|
||||
Hk_sum = []
|
||||
for i in range(self.ports):
|
||||
Hk_sum.append([])
|
||||
for j in range(self.ports):
|
||||
Hk_kp0 = H[:,i,j][keep]
|
||||
Hk_kp = Hk_kp0 if Hk_kp is None else np.hstack([Hk_kp, Hk_kp0])
|
||||
assert Hk_kp is not None
|
||||
Hk_sum = np.sum(np.abs(Hk_kp)**2)
|
||||
beta = float(np.sqrt(Hk_sum))
|
||||
mean_row = (beta / weights_kp.shape[0]) * np.sum(Phi_w, axis=0)
|
||||
A_w0 = np.concatenate([np.zeros(N*self.ports**2, float),
|
||||
np.real(mean_row).astype(float)]
|
||||
).reshape(1, -1)
|
||||
b_w0 = np.array([beta], float)
|
||||
Hk_sum[i].append(np.sum(np.abs(Hk_kp0)**2))
|
||||
# Hk_kp = Hk_kp0 if Hk_kp is None else np.hstack([Hk_kp, Hk_kp0])
|
||||
K_keep = int(np.count_nonzero(keep))
|
||||
A_w0 = []
|
||||
b_w0 = []
|
||||
# Hk_sum = np.sum(np.abs(Hk_kp)**2)
|
||||
for i in range(self.ports):
|
||||
for j in range(self.ports):
|
||||
beta_ij = float(np.sqrt(Hk_sum[i][j]))
|
||||
mean_row = (beta_ij / K_keep) * np.sum(Phi_w[keep, :], axis=0)
|
||||
A_w0.append(np.concatenate([np.zeros(N*self.ports**2, float),
|
||||
np.real(mean_row).astype(float)]
|
||||
).reshape(1, -1))
|
||||
b_w0.append(np.array([beta_ij], float))
|
||||
b_w0 = np.asarray(b_w0).ravel()
|
||||
|
||||
A_blocks += [A_w0]
|
||||
A_blocks += A_w0
|
||||
m = A_re.shape[0] + A_im.shape[0]
|
||||
b = np.zeros(m, float)
|
||||
b = np.concatenate([b, b_w0])
|
||||
@@ -307,16 +314,18 @@ def noise(n:complex,coeff:float=0.05):
|
||||
|
||||
if __name__ == "__main__":
|
||||
start_point = 0
|
||||
network = rf.Network("/tmp/paramer/simulation/3500/3500.s2p")
|
||||
id = 3000
|
||||
network = rf.Network(f"/tmp/paramer/simulation/{id}/{id}.s2p")
|
||||
# network = rf.data.ring_slot
|
||||
ports = network.nports
|
||||
K = 10
|
||||
K = 5
|
||||
|
||||
full_freqences = network.f[start_point:]
|
||||
noised_sampled_points = network.y[start_point:,:,:]
|
||||
sampled_points = network.y[start_point:,:,:]
|
||||
noised_sampled_points = network.y[start_point:,:,:].reshape(-1,ports,ports)
|
||||
sampled_points = network.y[start_point:,:,:].reshape(-1,ports,ports)
|
||||
|
||||
# noised_sampled_points = - network.y[start_point:,0,1].reshape(-1,1,1)
|
||||
# sampled_points = network.y[start_point:,1,1].reshape(-1,1,1)
|
||||
# noised_sampled_points = network.y[start_point:,0,0].reshape(-1,1,1)
|
||||
# sampled_points = network.y[start_point:,0,0].reshape(-1,1,1)
|
||||
|
||||
H,freqs = auto_select_multple_ports(noised_sampled_points,full_freqences,max_points=20)
|
||||
poles = generate_starting_poles(2,beta_min=1e4,beta_max=freqs[-1]*1.1)
|
||||
|
||||
280
core/freqency.py
280
core/freqency.py
@@ -1,172 +1,37 @@
|
||||
import numpy as np
|
||||
# def auto_select(H, freq,
|
||||
# n_baseline=64, # log-spaced backbone points
|
||||
# peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
|
||||
# peak_window=5, # take ±peak_window samples around each peak
|
||||
# topgrad_q=0.98, # keep top 2% largest slope/phase-change points
|
||||
# max_points=25, # final cap on selected samples (None = no cap)
|
||||
# ensure_ends=True):
|
||||
# """
|
||||
# Select several significant sample points for vector fitting.
|
||||
|
||||
# Strategy:
|
||||
# 1) Always keep endpoints (optional).
|
||||
# 2) Add a log-spaced baseline over the band.
|
||||
# 3) Detect resonance peaks in |H| (on a log scale) and keep small windows around them.
|
||||
# 4) Add points with the largest magnitude slope and phase-change (w.r.t log-f).
|
||||
# 5) De-duplicate, sort, and optionally thin to 'max_points' with priority
|
||||
# to endpoints and detected peaks.
|
||||
|
||||
# Parameters
|
||||
# ----------
|
||||
# H : (N,) complex array
|
||||
# Frequency response samples.
|
||||
# freq : (N,) float array
|
||||
# Frequency axis [Hz], strictly increasing.
|
||||
# n_baseline : int
|
||||
# Count of log-spaced baseline samples across the band.
|
||||
# peak_prominence : float
|
||||
# Peak prominence threshold as a fraction of the dynamic range in log|H|.
|
||||
# 0.05 ≈ keep peaks ≥ 5% of the range.
|
||||
# peak_window : int
|
||||
# Number of neighbor indices to include on each side of every detected peak.
|
||||
# topgrad_q : float in (0,1)
|
||||
# Quantile for selecting strong slope/phase points.
|
||||
# 0.98 ⇒ keep the top 2% largest derivatives.
|
||||
# max_points : int or None
|
||||
# If not None, cap the total number of selected indices to this value.
|
||||
# ensure_ends : bool
|
||||
# Always include the first and last samples.
|
||||
|
||||
# Returns
|
||||
# -------
|
||||
# H_sel : (K,) complex array
|
||||
# freq_sel : (K,) float array
|
||||
# """
|
||||
# H = np.asarray(H).reshape(-1)
|
||||
# f = np.asarray(freq).reshape(-1)
|
||||
# if H.size != f.size:
|
||||
# raise ValueError("H and freq must have the same length.")
|
||||
# N = f.size
|
||||
# if N < 4:
|
||||
# return H.copy(), f.copy()
|
||||
|
||||
# eps = 1e-16
|
||||
# mag = np.abs(H)
|
||||
# logmag = np.log10(mag + eps)
|
||||
# phase = np.unwrap(np.angle(H))
|
||||
|
||||
# # log-frequency axis (scale-invariant derivatives)
|
||||
# # keep it linear if any non-positive freq sneaks in
|
||||
# if np.all(f > 0):
|
||||
# lf = np.log(f)
|
||||
# else:
|
||||
# lf = f.copy()
|
||||
|
||||
# dlf = np.gradient(lf)
|
||||
# d_logmag = np.gradient(logmag) / (dlf + 1e-16)
|
||||
# d_phase = np.gradient(phase) / (dlf + 1e-16)
|
||||
|
||||
# idx = set()
|
||||
# if ensure_ends:
|
||||
# idx.update([0, N-1])
|
||||
|
||||
# # 1) log-spaced baseline
|
||||
# if n_baseline > 0:
|
||||
# # map a log grid to nearest indices
|
||||
# grid = np.linspace(lf.min(), lf.max(), n_baseline)
|
||||
# base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1)
|
||||
# idx.update(np.unique(base_idx).tolist())
|
||||
|
||||
# # 2) peaks in |H|
|
||||
# try:
|
||||
# from scipy.signal import find_peaks
|
||||
# dyn = logmag.max() - logmag.min()
|
||||
# prom = peak_prominence * (dyn + 1e-12)
|
||||
# peaks, _ = find_peaks(logmag, prominence=prom)
|
||||
# except Exception:
|
||||
# # simple fallback: strict local maxima
|
||||
# peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
|
||||
|
||||
# for p in peaks:
|
||||
# lo = max(0, p - peak_window)
|
||||
# hi = min(N, p + peak_window + 1)
|
||||
# idx.update(range(lo, hi))
|
||||
|
||||
# # 3) strongest slope / phase-change points
|
||||
# thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
|
||||
# thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
|
||||
# idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
|
||||
# idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
|
||||
|
||||
# # 4) finalize set
|
||||
# sel = np.array(sorted(idx), dtype=int)
|
||||
|
||||
# # 5) optional thinning with priority to endpoints and peaks
|
||||
# if max_points is not None and sel.size > max_points:
|
||||
# priority = np.zeros(sel.size, dtype=int)
|
||||
# if ensure_ends:
|
||||
# priority[(sel == 0) | (sel == N-1)] = 3
|
||||
# if peaks.size:
|
||||
# priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2)
|
||||
|
||||
# keep = []
|
||||
# budget = max_points
|
||||
# # keep highest-priority first
|
||||
# for lev in (3, 2, 1, 0):
|
||||
# cand = sel[priority == lev]
|
||||
# if cand.size == 0:
|
||||
# continue
|
||||
# if cand.size <= budget:
|
||||
# keep.extend(cand.tolist())
|
||||
# budget -= cand.size
|
||||
# else:
|
||||
# step = max(1, int(np.ceil(cand.size / budget)))
|
||||
# keep.extend(cand[::step][:budget].tolist())
|
||||
# budget = 0
|
||||
# if budget == 0:
|
||||
# break
|
||||
# sel = np.array(sorted(set(keep)), dtype=int)
|
||||
|
||||
# return H[sel], f[sel]
|
||||
|
||||
def auto_select(H, freq,
|
||||
n_baseline=64, # log-spaced backbone points
|
||||
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
|
||||
peak_window=5, # take ±peak_window samples around each peak
|
||||
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
|
||||
max_points=25, # final cap on selected samples (None = no cap)
|
||||
ensure_ends=True):
|
||||
H = np.asarray(H).reshape(-1)
|
||||
f = np.asarray(freq).reshape(-1)
|
||||
def _auto_select_indices(H, freq,
|
||||
n_baseline=64,
|
||||
peak_prominence=0.05,
|
||||
peak_window=5,
|
||||
topgrad_q=0.98,
|
||||
max_points=25,
|
||||
ensure_ends=True):
|
||||
"""返回选中的全局索引,避免直接切片导致多端口不对齐。"""
|
||||
H = np.asarray(H).astype(np.complex128).reshape(-1)
|
||||
f = np.asarray(freq).astype(float).reshape(-1)
|
||||
if H.size != f.size:
|
||||
raise ValueError("H and freq must have the same length.")
|
||||
N = f.size
|
||||
if N < 4 or max_points is None or max_points >= N:
|
||||
# 直接返回所有点
|
||||
return H.copy(), f.copy()
|
||||
return np.arange(N, dtype=int)
|
||||
|
||||
eps = 1e-16
|
||||
mag = np.abs(H)
|
||||
logmag = np.log10(mag + eps)
|
||||
phase = np.unwrap(np.angle(H))
|
||||
|
||||
if np.all(f > 0):
|
||||
lf = np.log(f)
|
||||
else:
|
||||
lf = f.copy()
|
||||
|
||||
lf = np.log(f) if np.all(f > 0) else f.copy()
|
||||
dlf = np.gradient(lf)
|
||||
d_logmag = np.gradient(logmag) / (dlf + 1e-16)
|
||||
d_phase = np.gradient(phase) / (dlf + 1e-16)
|
||||
d_phase = np.gradient(phase) / (dlf + 1e-16)
|
||||
|
||||
idx = set()
|
||||
if ensure_ends:
|
||||
idx.update([0, N-1])
|
||||
idx.update([0, N - 1])
|
||||
|
||||
if n_baseline > 0:
|
||||
grid = np.linspace(lf.min(), lf.max(), n_baseline)
|
||||
base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1)
|
||||
base_idx = np.clip(np.searchsorted(lf, grid), 0, N - 1)
|
||||
idx.update(np.unique(base_idx).tolist())
|
||||
|
||||
try:
|
||||
@@ -177,24 +42,25 @@ def auto_select(H, freq,
|
||||
except Exception:
|
||||
peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
|
||||
|
||||
for p in peaks:
|
||||
lo = max(0, p - peak_window)
|
||||
hi = min(N, p + peak_window + 1)
|
||||
for p in np.atleast_1d(peaks):
|
||||
lo = max(0, int(p) - peak_window)
|
||||
hi = min(N, int(p) + peak_window + 1)
|
||||
idx.update(range(lo, hi))
|
||||
|
||||
thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
|
||||
thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
|
||||
thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
|
||||
idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
|
||||
idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
|
||||
idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
|
||||
|
||||
sel = np.array(sorted(idx), dtype=int)
|
||||
|
||||
if sel.size > max_points:
|
||||
priority = np.zeros(sel.size, dtype=int)
|
||||
if ensure_ends:
|
||||
priority[(sel == 0) | (sel == N-1)] = 3
|
||||
if peaks.size:
|
||||
priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2)
|
||||
priority[(sel == 0) | (sel == N - 1)] = 3
|
||||
if np.size(peaks):
|
||||
mask = np.isin(sel, np.atleast_1d(peaks))
|
||||
priority[mask] = np.maximum(priority[mask], 2)
|
||||
|
||||
keep = []
|
||||
budget = max_points
|
||||
@@ -206,7 +72,7 @@ def auto_select(H, freq,
|
||||
keep.extend(cand.tolist())
|
||||
budget -= cand.size
|
||||
else:
|
||||
step = max(1, int(np.ceil(cand.size / budget)))
|
||||
step = max(1, int(np.ceil(cand.size / max(budget, 1))))
|
||||
keep.extend(cand[::step][:budget].tolist())
|
||||
budget = 0
|
||||
if budget == 0:
|
||||
@@ -218,16 +84,36 @@ def auto_select(H, freq,
|
||||
missing = list(sorted(all_idx - set(sel)))
|
||||
n_missing = max_points - sel.size
|
||||
if n_missing > 0 and missing:
|
||||
extra = np.linspace(0, len(missing)-1, n_missing, dtype=int)
|
||||
extra = np.linspace(0, len(missing) - 1, n_missing, dtype=int)
|
||||
sel = np.concatenate([sel, np.array(missing)[extra]])
|
||||
sel = np.array(sorted(set(sel)), dtype=int)
|
||||
if sel.size < max_points:
|
||||
left = list(sorted(all_idx - set(sel)))
|
||||
if left:
|
||||
sel = np.concatenate([sel, np.random.choice(left, max_points-sel.size, replace=False)])
|
||||
add = min(max_points - sel.size, len(left))
|
||||
sel = np.concatenate([sel, np.random.choice(left, add, replace=False)])
|
||||
sel = np.array(sorted(set(sel)), dtype=int)
|
||||
sel = sel[:max_points]
|
||||
|
||||
return sel
|
||||
|
||||
|
||||
def auto_select(H, freq,
|
||||
n_baseline=64, # log-spaced backbone points
|
||||
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
|
||||
peak_window=5, # take ±peak_window samples around each peak
|
||||
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
|
||||
max_points=25, # final cap on selected samples (None = no cap)
|
||||
ensure_ends=True):
|
||||
sel = _auto_select_indices(H, freq,
|
||||
n_baseline=n_baseline,
|
||||
peak_prominence=peak_prominence,
|
||||
peak_window=peak_window,
|
||||
topgrad_q=topgrad_q,
|
||||
max_points=max_points,
|
||||
ensure_ends=ensure_ends)
|
||||
H = np.asarray(H).astype(np.complex128).reshape(-1)
|
||||
f = np.asarray(freq).astype(float).reshape(-1)
|
||||
return H[sel], f[sel]
|
||||
|
||||
def auto_select_multple_ports(H, freq,
|
||||
@@ -235,14 +121,66 @@ def auto_select_multple_ports(H, freq,
|
||||
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
|
||||
peak_window=5, # take ±peak_window samples around each peak
|
||||
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
|
||||
max_points=25, # final cap on selected samples (None = no cap)
|
||||
max_points=25, # final cap on selected samples (None = no cap)
|
||||
ensure_ends=True):
|
||||
ports = H.shape[1]
|
||||
H_selected = np.zeros((max_points,ports,ports),dtype=complex)
|
||||
for i in range(ports):
|
||||
for j in range(ports):
|
||||
H_selected[:,i,j], freq_selected = auto_select(H[:,i,j], freq,
|
||||
n_baseline=n_baseline, peak_prominence=peak_prominence,
|
||||
peak_window=peak_window, topgrad_q=topgrad_q,
|
||||
max_points=max_points, ensure_ends=ensure_ends)
|
||||
return H_selected, freq_selected
|
||||
"""
|
||||
多端口统一选点:为每个(i,j)先各自产出候选索引,然后做并集并按出现次数优先级裁剪到同一套索引,
|
||||
确保返回的 H_selected 与 freq_selected 全端口严格对齐。
|
||||
输入:
|
||||
H: (N, P, P) 复数频响
|
||||
freq: (N,) 频率
|
||||
返回:
|
||||
H_selected: (K, P, P)
|
||||
freq_selected: (K,)
|
||||
其中 K == max_points(或当样本不足时为 N)。
|
||||
"""
|
||||
H = np.asarray(H)
|
||||
f = np.asarray(freq).astype(float).reshape(-1)
|
||||
if H.ndim != 3:
|
||||
raise ValueError("H must have shape (N, P, P)")
|
||||
N, P1, P2 = H.shape
|
||||
if f.size != N:
|
||||
raise ValueError("H and freq must have the same first dimension length.")
|
||||
if P1 != P2:
|
||||
raise ValueError("H must be square on ports (P x P).")
|
||||
|
||||
# 边界:样本太少或不需裁剪,直接返回全量且对齐
|
||||
if N < 4 or max_points is None or max_points >= N:
|
||||
return H.copy(), f.copy()
|
||||
|
||||
# 每个(i,j)各自选索引
|
||||
counts = {}
|
||||
all_sel_sets = []
|
||||
for i in range(P1):
|
||||
for j in range(P2):
|
||||
sel = _auto_select_indices(H[:, i, j], f,
|
||||
n_baseline=n_baseline,
|
||||
peak_prominence=peak_prominence,
|
||||
peak_window=peak_window,
|
||||
topgrad_q=topgrad_q,
|
||||
max_points=max_points,
|
||||
ensure_ends=ensure_ends)
|
||||
all_sel_sets.append(sel)
|
||||
for idx in sel.tolist():
|
||||
counts[idx] = counts.get(idx, 0) + 1
|
||||
|
||||
# 并集 + 频次优先裁剪到 max_points
|
||||
union_idx = sorted(set(np.concatenate(all_sel_sets)) )
|
||||
|
||||
# 如果并集不超过预算,必要时补点至 max_points(均匀抽取未选样本)
|
||||
if len(union_idx) <= max_points:
|
||||
sel_final = union_idx
|
||||
if len(sel_final) < max_points:
|
||||
remaining = sorted(set(range(N)) - set(sel_final))
|
||||
if remaining:
|
||||
need = max_points - len(sel_final)
|
||||
step = max(1, int(np.ceil(len(remaining) / need)))
|
||||
sel_final.extend(remaining[::step][:need])
|
||||
sel_final = sorted(set(sel_final))[:max_points]
|
||||
else:
|
||||
# 过多则按出现次数从高到低选,出现次数相同按索引位置靠前优先
|
||||
sorted_by_score = sorted(union_idx, key=lambda k: (-counts.get(k, 0), k))
|
||||
sel_final = sorted(sorted_by_score[:max_points])
|
||||
|
||||
sel_final = np.array(sel_final, dtype=int)
|
||||
return H[sel_final], f[sel_final]
|
||||
|
||||
@@ -258,11 +258,11 @@ def noise(n:complex,coeff:float=0.05):
|
||||
if __name__ == "__main__":
|
||||
start_point = 0
|
||||
network = rf.Network("/tmp/paramer/simulation/3000/3000.s2p")
|
||||
K = 50
|
||||
K = 10
|
||||
|
||||
full_freqences = network.f[start_point:]
|
||||
noised_sampled_points = [(network.y[i][0][0]) for i in range(start_point,len(network.y))]
|
||||
sampled_points = [network.y[i][0][0] for i in range(start_point,len(network.y))]
|
||||
noised_sampled_points = [(network.y[i][1][1]) for i in range(start_point,len(network.y))]
|
||||
sampled_points = [network.y[i][1][1] for i in range(start_point,len(network.y))]
|
||||
|
||||
H11,freqs = auto_select(noised_sampled_points,full_freqences,max_points=20)
|
||||
poles = generate_starting_poles(2,beta_min=1e4,beta_max=freqs[-1]*1.1)
|
||||
@@ -324,9 +324,9 @@ if __name__ == "__main__":
|
||||
ax01 = axes[0][1]
|
||||
ax01.set_title("Response i=0, j=0")
|
||||
ax01.set_ylabel("Phase (deg)")
|
||||
ax01.plot(network.f[start_point:], np.angle([network.y[i][0][0] for i in range(start_point,len(network.y))],deg=True), 'o', ms=4, color='red', label='Samples')
|
||||
ax01.plot(network.f[start_point:], np.angle(H11_evaluated,deg=True), '-', lw=2, color='k', label='Fit')
|
||||
ax01.plot(freqs, np.angle(H11,deg=True), 'x', ms=4, color='blue', label='Input Samples')
|
||||
ax01.plot(full_freqences, np.angle(sampled_points,deg=True), 'o', ms=4, color='red', label='Samples')
|
||||
ax01.plot(full_freqences, np.angle(fitted_points,deg=True), '-', lw=2, color='k', label='Fit')
|
||||
ax01.plot(sliced_freqences, np.angle(input_points,deg=True), 'x', ms=4, color='blue', label='Input Samples')
|
||||
ax01.legend(loc="best")
|
||||
|
||||
ax10 = axes[1][0]
|
||||
|
||||
909
core/util.py
Normal file
909
core/util.py
Normal file
@@ -0,0 +1,909 @@
|
||||
"""
|
||||
|
||||
.. currentmodule:: skrf.util
|
||||
========================================
|
||||
util (:mod:`skrf.util`)
|
||||
========================================
|
||||
|
||||
Holds utilities that are general conveniences.
|
||||
|
||||
|
||||
Time-related utilities
|
||||
----------------------
|
||||
.. autosummary::
|
||||
:toctree: generated/
|
||||
|
||||
now_string
|
||||
now_string_2_dt
|
||||
|
||||
ProgressBar
|
||||
|
||||
Array-related functions
|
||||
-----------------------
|
||||
.. autosummary::
|
||||
:toctree: generated/
|
||||
|
||||
find_nearest
|
||||
find_nearest_index
|
||||
has_duplicate_value
|
||||
smooth
|
||||
|
||||
File-related functions
|
||||
----------------------
|
||||
.. autosummary::
|
||||
:toctree: generated/
|
||||
|
||||
get_fid
|
||||
get_extn
|
||||
basename_noext
|
||||
git_version
|
||||
unique_name
|
||||
findReplace
|
||||
dict_2_recarray
|
||||
|
||||
General Purpose Objects
|
||||
-----------------------
|
||||
.. autosummary::
|
||||
:toctree: generated/
|
||||
|
||||
HomoList
|
||||
HomoDict
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import fnmatch
|
||||
import os
|
||||
import pprint
|
||||
import re
|
||||
import sys
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
from pathlib import Path
|
||||
from subprocess import PIPE, Popen
|
||||
from typing import Any, Callable, Iterable, TypeVar
|
||||
|
||||
import numpy as np
|
||||
|
||||
from skrf.constants import Number
|
||||
|
||||
try:
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.axes import Axes
|
||||
from matplotlib.figure import Figure
|
||||
except ImportError:
|
||||
Figure = TypeVar("Figure")
|
||||
Axes = TypeVar("Axes")
|
||||
pass
|
||||
|
||||
def plotting_available() -> bool:
|
||||
return "matplotlib" in sys.modules
|
||||
|
||||
def partial_with_docs(func, *args1, **kwargs1):
|
||||
@wraps(func)
|
||||
def method(self, *args2, **kwargs2):
|
||||
return func(self, *args1, *args2, **kwargs1, **kwargs2)
|
||||
return method
|
||||
|
||||
def axes_kwarg(func):
|
||||
"""
|
||||
This decorator checks if a :class:`matplotlib.axes.Axes` object is passed,
|
||||
if not the current axis will be gathered through :func:`plt.gca`.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
When trying to run the decorated function without matplotlib
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
ax = kwargs.pop('ax', None)
|
||||
try:
|
||||
if ax is None:
|
||||
ax = plt.gca()
|
||||
except NameError as err:
|
||||
raise RuntimeError("Plotting is not available") from err
|
||||
func(*args, ax=ax, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def copy_doc(copy_func: Callable) -> Callable:
|
||||
"""Use Example: copy_doc(self.copy_func)(self.func) or used as deco"""
|
||||
def wrapper(func: Callable) -> Callable:
|
||||
func.__doc__ = copy_func.__doc__
|
||||
return func
|
||||
return wrapper
|
||||
|
||||
def figure(*args, **kwargs) -> Figure:
|
||||
"""
|
||||
Wraps the matplotlib figure call and raises if not available.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
When trying to get subplots without matplotlib installed.
|
||||
"""
|
||||
|
||||
try:
|
||||
return plt.figure(*args, **kwargs)
|
||||
except NameError as err:
|
||||
raise RuntimeError("Plotting is not available") from err
|
||||
|
||||
def subplots(*args, **kwargs) -> tuple[Figure, np.ndarray]:
|
||||
"""
|
||||
Wraps the matplotlib subplots call and raises if not available.
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
When trying to get subplots without matplotlib installed.
|
||||
"""
|
||||
|
||||
try:
|
||||
return plt.subplots(*args, **kwargs)
|
||||
except NameError as err:
|
||||
raise RuntimeError("Plotting is not available") from err
|
||||
|
||||
def now_string() -> str:
|
||||
"""
|
||||
Return a unique sortable string, representing the current time.
|
||||
|
||||
Nice for generating date-time stamps to be used in file-names,
|
||||
the companion function :func:`now_string_2_dt` can be used
|
||||
to read these string back into datetime objects.
|
||||
|
||||
Returns
|
||||
-------
|
||||
now : string
|
||||
curent date-time stamps.
|
||||
|
||||
See Also
|
||||
--------
|
||||
now_string_2_dt
|
||||
|
||||
"""
|
||||
return datetime.now().__str__().replace('-','.').replace(':','.').replace(' ','.')
|
||||
|
||||
|
||||
def now_string_2_dt(s: str) -> datetime:
|
||||
"""
|
||||
Converts the output of :func:`now_string` to a datetime object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
s : str
|
||||
date-time stamps string as generated by :func:`now_string`
|
||||
|
||||
Returns
|
||||
-------
|
||||
dt : datetime
|
||||
date-time stamps
|
||||
|
||||
See Also
|
||||
--------
|
||||
now_string
|
||||
|
||||
"""
|
||||
return datetime(*[int(k) for k in s.split('.')])
|
||||
|
||||
|
||||
def find_nearest(array: np.ndarray, value: Number) -> Number:
|
||||
"""
|
||||
Find the nearest value in array.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
array : np.ndarray
|
||||
array we are searching for a value in
|
||||
value : element of the array
|
||||
value to search for
|
||||
|
||||
Returns
|
||||
--------
|
||||
found_value : an element of the array
|
||||
the value that is numerically closest to `value`
|
||||
|
||||
"""
|
||||
idx = find_nearest_index(array, value)
|
||||
return array[idx]
|
||||
|
||||
|
||||
def find_nearest_index(array: np.ndarray, value: Number) -> int:
|
||||
"""
|
||||
Find the nearest index for a value in array.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
array : np.ndarray
|
||||
array we are searching for a value in
|
||||
value : element of the array
|
||||
value to search for
|
||||
|
||||
Returns
|
||||
--------
|
||||
found_index : int
|
||||
the index at which the numerically closest element to `value`
|
||||
was found at
|
||||
|
||||
References
|
||||
----------
|
||||
taken from http://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array
|
||||
|
||||
"""
|
||||
return (np.abs(array-value)).argmin()
|
||||
|
||||
|
||||
def slice_domain(x: np.ndarray, domain: tuple):
|
||||
"""
|
||||
Returns a slice object closest to the `domain` of `x`
|
||||
|
||||
domain = x[slice_domain(x, (start, stop))]
|
||||
|
||||
Parameters
|
||||
----------
|
||||
vector : np.ndarray
|
||||
an array of values
|
||||
domain : tuple
|
||||
tuple of (start,stop) values defining the domain over
|
||||
which to slice
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> x = linspace(0,10,101)
|
||||
>>> idx = slice_domain(x, (2,6))
|
||||
>>> x[idx]
|
||||
|
||||
"""
|
||||
start = find_nearest_index(x, domain[0])
|
||||
stop = find_nearest_index(x, domain[1])
|
||||
return slice(start, stop+1)
|
||||
|
||||
# file IO
|
||||
|
||||
|
||||
def get_fid(file, *args, **kwargs):
|
||||
r"""
|
||||
Return a file object, given a filename or file object.
|
||||
|
||||
Useful when you want to allow the arguments of a function to
|
||||
be either files or filenames
|
||||
|
||||
Parameters
|
||||
----------
|
||||
file : str/unicode, Path, or file-object
|
||||
file to open
|
||||
\*args, \*\*kwargs : arguments and keyword arguments to `open()`
|
||||
|
||||
Returns
|
||||
-------
|
||||
fid : file object
|
||||
|
||||
"""
|
||||
if isinstance(file, (str, Path)):
|
||||
return open(file, *args, **kwargs)
|
||||
else:
|
||||
return file
|
||||
|
||||
|
||||
def get_extn(filename: str | Path) -> str:
|
||||
"""
|
||||
Get the extension from a filename.
|
||||
|
||||
The extension is defined as everything passed the last '.'.
|
||||
Returns None if it ain't got one
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filename : string or Path
|
||||
the filename
|
||||
|
||||
Returns
|
||||
-------
|
||||
ext : string, None
|
||||
either the extension (not including '.') or None if there
|
||||
isn't one
|
||||
|
||||
"""
|
||||
|
||||
if isinstance(filename, Path):
|
||||
return filename.suffix.strip('.') or None
|
||||
|
||||
ext = os.path.splitext(filename)[-1]
|
||||
if len(ext) == 0:
|
||||
return None
|
||||
else:
|
||||
return ext[1:]
|
||||
|
||||
|
||||
def basename_noext(filename: str) -> str:
|
||||
"""
|
||||
Get the basename and strips extension.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filename : string
|
||||
the filename
|
||||
|
||||
Returns
|
||||
-------
|
||||
basename : str
|
||||
file basename (ie. without extension)
|
||||
|
||||
"""
|
||||
return os.path.splitext(os.path.basename(filename))[0]
|
||||
|
||||
|
||||
# git
|
||||
def git_version(modname: str) -> str:
|
||||
"""
|
||||
Return output 'git describe', executed in a module's root directory.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
modname : str
|
||||
module name
|
||||
|
||||
Returns
|
||||
-------
|
||||
out : str
|
||||
output of 'git describe'
|
||||
|
||||
"""
|
||||
mod = __import__(modname)
|
||||
mod_dir = os.path.split(mod.__file__)[0]
|
||||
p = Popen(['git', 'describe'], stdout=PIPE, stderr=PIPE, cwd=mod_dir)
|
||||
|
||||
try:
|
||||
out, er = p.communicate()
|
||||
except(OSError):
|
||||
return None
|
||||
|
||||
out = out.strip('\n')
|
||||
if out == '':
|
||||
return None
|
||||
return out
|
||||
|
||||
|
||||
def dict_2_recarray(d: dict, delim: str, dtype: list[tuple]) -> np.ndarray:
|
||||
"""
|
||||
Turn a dictionary of structured keys to a record array of objects.
|
||||
|
||||
This is useful if you save data-base like meta-data in the form
|
||||
or file-naming conventions, aka 'the poor-mans database'
|
||||
|
||||
Parameters
|
||||
----------
|
||||
d : dict
|
||||
dictionnary of structured keys
|
||||
delim : str
|
||||
delimiter string
|
||||
dtype : list of tuple
|
||||
list of type, where a type is tuple like ('type_name', type)
|
||||
|
||||
Returns
|
||||
-------
|
||||
ra : numpy.array
|
||||
|
||||
Examples
|
||||
--------
|
||||
Given a directory of networks like:
|
||||
|
||||
>>> ls
|
||||
a1,0.0,0.0.s1p a1,3.0,3.0.s1p a2,3.0,-3.0.s1p b1,-3.0,3.0.s1p
|
||||
...
|
||||
|
||||
you can sort based on the values or each field, after defining their
|
||||
type with `dtype`. The `values` field accesses the objects.
|
||||
|
||||
>>> d = rf.read_all_networks('/tmp/')
|
||||
>>> delim = ','
|
||||
>>> dtype = [('name', object), ('voltage', float), ('current', float)]
|
||||
>>> ra = dict_2_recarray(d=rf.ran(dir), delim=delim, dtype =dtype)
|
||||
|
||||
then you can sift like you do with numpy arrays
|
||||
|
||||
>>> ra[ra['voltage'] < 3]['values']
|
||||
array([1-Port Network: 'a2,0.0,-3.0', 450-800 GHz, 101 pts, z0=[ 50.+0.j],
|
||||
1-Port Network: 'b1,0.0,3.0', 450-800 GHz, 101 pts, z0=[ 50.+0.j],
|
||||
1-Port Network: 'a1,0.0,-3.0', 450-800 GHz, 101 pts, z0=[ 50.+0.j],
|
||||
"""
|
||||
|
||||
split_keys = [tuple(k.split(delim)+[d[k]]) for k in d.keys()]
|
||||
x = np.array(split_keys, dtype=dtype+[('values',object)])
|
||||
return x
|
||||
|
||||
|
||||
def findReplace(directory: str, find: str, replace: str, file_pattern: str):
|
||||
r"""
|
||||
Find/replace some txt in all files in a directory, recursively.
|
||||
|
||||
This was found in [1]_ .
|
||||
|
||||
Parameters
|
||||
----------
|
||||
directory : str
|
||||
path of a directory
|
||||
find : str
|
||||
pattern to search for
|
||||
replace : str
|
||||
string to replace with
|
||||
file_pattern : str
|
||||
file pattern for filtering. Ex: '\*.txt'.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> rf.findReplace('some_dir', 'find this', 'replace with this', '*.txt')
|
||||
|
||||
References
|
||||
----------
|
||||
.. [1] http://stackoverflow.com/questions/4205854/python-way-to-recursively-find-and-replace-string-in-text-files
|
||||
"""
|
||||
for path, _dirs, files in os.walk(os.path.abspath(directory)):
|
||||
for filename in fnmatch.filter(files, file_pattern):
|
||||
filepath = os.path.join(path, filename)
|
||||
with open(filepath) as f:
|
||||
s = f.read()
|
||||
s = s.replace(find, replace)
|
||||
with open(filepath, "w") as f:
|
||||
f.write(s)
|
||||
|
||||
|
||||
# general purpose objects
|
||||
|
||||
class HomoList(collections.abc.Sequence):
|
||||
"""
|
||||
A Homogeneous Sequence.
|
||||
|
||||
Provides a class for a list-like object which contains
|
||||
homogeneous values. Attributes of the values can be accessed through
|
||||
the attributes of HomoList. Searching is done like numpy arrays.
|
||||
|
||||
Initialized from a list of all the same type
|
||||
|
||||
>>> h = HomoDict([Foo(...), Foo(...)])
|
||||
|
||||
The individual values of `h` can be access in identical fashion to
|
||||
Lists.
|
||||
|
||||
>>> h[0]
|
||||
|
||||
Assuming that `Foo` has property `prop` and function `func` ...
|
||||
|
||||
Access elements' properties:
|
||||
|
||||
>>> h.prop
|
||||
|
||||
Access elements' functions:
|
||||
|
||||
>>> h.func()
|
||||
|
||||
Searching:
|
||||
|
||||
>>> h[h.prop == value]
|
||||
>>> h[h.prop < value]
|
||||
|
||||
Multiple search:
|
||||
|
||||
>>> h[set(h.prop==value1) & set( h.prop2==value2)]
|
||||
|
||||
Combos:
|
||||
|
||||
>>> h[h.prop==value].func()
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, list_):
|
||||
self.store = list(list_)
|
||||
|
||||
def __eq__(self, value):
|
||||
return [k for k in range(len(self)) if self.store[k] == value ]
|
||||
|
||||
def __ne__(self, value):
|
||||
return [k for k in range(len(self)) if self.store[k] != value ]
|
||||
|
||||
def __gt__(self, value):
|
||||
return [k for k in range(len(self)) if self.store[k] > value ]
|
||||
|
||||
def __ge__(self, value):
|
||||
return [k for k in range(len(self)) if self.store[k] >= value ]
|
||||
|
||||
def __lt__(self, value):
|
||||
return [k for k in range(len(self)) if self.store[k] < value ]
|
||||
|
||||
def __le__(self, value):
|
||||
return [k for k in range(len(self)) if self.store[k] <= value ]
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self.__class__(
|
||||
[k.__getattribute__(name) for k in self.store])
|
||||
|
||||
def __getitem__(self, idx):
|
||||
try:
|
||||
return self.store[idx]
|
||||
except(TypeError):
|
||||
return self.__class__([self.store[k] for k in idx])
|
||||
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.__class__(
|
||||
[k(*args,**kwargs) for k in self.store])
|
||||
|
||||
def __setitem__(self, idx, value):
|
||||
self.store[idx] = value
|
||||
|
||||
def __delitem__(self, idx):
|
||||
del self.store[idx]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.store)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.store)
|
||||
|
||||
def __str__(self):
|
||||
return pprint.pformat(self.store)
|
||||
|
||||
def __repr__(self):
|
||||
return pprint.pformat(self.store)
|
||||
|
||||
|
||||
class HomoDict(collections.abc.MutableMapping):
|
||||
"""
|
||||
A Homogeneous Mutable Mapping.
|
||||
|
||||
Provides a class for a dictionary-like object which contains
|
||||
homogeneous values. Attributes of the values can be accessed through
|
||||
the attributes of HomoDict. Searching is done like numpy arrays.
|
||||
|
||||
Initialized from a dictionary containing values of all the same type
|
||||
|
||||
>>> h = HomoDict({'a':Foo(...),'b': Foo(...), 'c':Foo(..)})
|
||||
|
||||
The individual values of `h` can be access in identical fashion to
|
||||
Dictionaries.
|
||||
|
||||
>>> h['key']
|
||||
|
||||
Assuming that `Foo` has property `prop` and function `func` ...
|
||||
|
||||
Access elements' properties:
|
||||
|
||||
>>> h.prop
|
||||
|
||||
Access elements' functions:
|
||||
|
||||
>>> h.func()
|
||||
|
||||
Searching:
|
||||
|
||||
>>> h[h.prop == value]
|
||||
>>> h[h.prop < value]
|
||||
|
||||
Multiple search:
|
||||
|
||||
>>> h[set(h.prop==value1) & set( h.prop2==value2)]
|
||||
|
||||
Combos:
|
||||
|
||||
>>> h[h.prop==value].func()
|
||||
"""
|
||||
def __init__(self, dict_):
|
||||
self.store = dict(dict_)
|
||||
|
||||
def __eq__(self, value):
|
||||
return [k for k in self.store if self.store[k] == value ]
|
||||
|
||||
def __ne__(self, value):
|
||||
return [k for k in self.store if self.store[k] != value ]
|
||||
|
||||
def __gt__(self, value):
|
||||
return [k for k in self.store if self.store[k] > value ]
|
||||
|
||||
def __ge__(self, value):
|
||||
return [k for k in self.store if self.store[k] >= value ]
|
||||
|
||||
def __lt__(self, value):
|
||||
return [k for k in self.store if self.store[k] < value ]
|
||||
|
||||
def __le__(self, value):
|
||||
return [k for k in self.store if self.store[k] <= value ]
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self.__class__(
|
||||
{k: getattr(self.store[k],name) for k in self.store})
|
||||
|
||||
def __getitem__(self, key):
|
||||
if isinstance(key, str):
|
||||
return self.store[key]
|
||||
else:
|
||||
c = self.__class__({k:self.store[k] for k in key})
|
||||
return c
|
||||
#if len(c) == 1:
|
||||
# return c.store.values()[0]
|
||||
#else:
|
||||
# return c
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.__class__(
|
||||
{k: self.store[k](*args, **kwargs) for k in self.store})
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.store[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.store[key]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.store)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.store)
|
||||
|
||||
def __str__(self):
|
||||
return pprint.pformat(self.store)
|
||||
|
||||
def __repr__(self):
|
||||
return pprint.pformat(self.store)
|
||||
|
||||
|
||||
def copy(self):
|
||||
return HomoDict(self.store)
|
||||
|
||||
|
||||
def filter_nones(self):
|
||||
self.store = {k:self.store[k] for k in self.store \
|
||||
if self.store[k] is not None}
|
||||
|
||||
def filter(self, **kwargs):
|
||||
"""
|
||||
Filter self based on kwargs
|
||||
|
||||
This is equivalent to:
|
||||
|
||||
>>> h = HomoDict(...)
|
||||
>>> for k in kwargs:
|
||||
>>> h = h[k ==kwargs[k]]
|
||||
>>> return h
|
||||
|
||||
prefixing the kwarg value with a '!' causes a not equal test (!=)
|
||||
|
||||
Examples
|
||||
----------
|
||||
>>> h = HomoDict(...)
|
||||
>>> h.filter(name='jean', age = '18', gender ='!female')
|
||||
|
||||
"""
|
||||
a = self
|
||||
for k in kwargs:
|
||||
if kwargs[k][0] == '!':
|
||||
a = a[a.__getattr__(k) != kwargs[k][1:]]
|
||||
else:
|
||||
a = a[a.__getattr__(k) == kwargs[k]]
|
||||
return a
|
||||
|
||||
|
||||
def has_duplicate_value(value: Any, values: Iterable, index: int) -> bool | int:
|
||||
"""
|
||||
Check if there is another value of the current index in the list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
value : Any
|
||||
any value in a list
|
||||
values : Iterable
|
||||
the iterable containing the values
|
||||
index : int
|
||||
the index of the current item we are checking for.
|
||||
|
||||
Returns
|
||||
-------
|
||||
index : bool or int
|
||||
returns None if no duplicate found, or the index of the first found duplicate
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> rf.has_duplicate_value(0, [1, 2, 0, 3, 0], -1) # -> 2
|
||||
>>> rf.has_duplicate_value(0, [1, 2, 0, 3, 0], 2) # -> 4
|
||||
>>> rf.has_duplicate_value(3, [1, 2, 0, 3, 0], 0) # -> 3
|
||||
>>> rf.has_duplicate_value(3, [1, 2, 0, 3, 0], 3) # -> False
|
||||
"""
|
||||
|
||||
for i, val in enumerate(values):
|
||||
if i == index:
|
||||
continue
|
||||
if value == val:
|
||||
return i
|
||||
return False
|
||||
|
||||
|
||||
def unique_name(name: str, names: list, exclude: int = -1) -> str:
|
||||
"""
|
||||
Pass in a name and a list of names, and increment with _## as necessary to ensure a unique name.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name : str
|
||||
the chosen name, to be modified if necessary
|
||||
names : list
|
||||
list of names (str)
|
||||
exclude : int, optional
|
||||
the index of an item to be excluded from the search. Default is -1.
|
||||
|
||||
Returns
|
||||
-------
|
||||
unique_name : str
|
||||
|
||||
"""
|
||||
if not has_duplicate_value(name, names, exclude):
|
||||
return name
|
||||
else:
|
||||
if re.match(r"_\d\d", name[-3:]):
|
||||
name_base = name[:-3]
|
||||
suffix = int(name[-2:])
|
||||
else:
|
||||
name_base = name
|
||||
suffix = 1
|
||||
|
||||
for num in range(suffix, 100, 1):
|
||||
name = f"{name_base:s}_{num:02d}"
|
||||
if not has_duplicate_value(name, names, exclude):
|
||||
break
|
||||
return name
|
||||
|
||||
|
||||
def smooth(x: np.ndarray, window_len: int = 11, window: str = 'flat') -> np.ndarray:
|
||||
"""
|
||||
Smooth the data using a window with requested size.
|
||||
|
||||
Based on the function from the scipy cookbook [#]_
|
||||
|
||||
This method is based on the convolution of a scaled window with the signal.
|
||||
The signal is prepared by introducing reflected copies of the signal
|
||||
(with the window size) in both ends so that transient parts are minimized
|
||||
in the beginning and end part of the output signal.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
x : numpy.array
|
||||
the input signal
|
||||
window_len : int, optional
|
||||
the dimension of the smoothing window; should be an odd integer.
|
||||
Default is 11.
|
||||
window : str, optional
|
||||
the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
|
||||
flat window will produce a moving average smoothing. Default is 'flat'
|
||||
|
||||
Returns
|
||||
-------
|
||||
y : numpy.array
|
||||
The smoothed signal
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> t = linspace(-2, 2, 0.1)
|
||||
>>> x = sin(t) + randn(len(t))*0.1
|
||||
>>> y = smooth(x)
|
||||
|
||||
See Also
|
||||
--------
|
||||
numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
|
||||
scipy.signal.lfilter
|
||||
|
||||
Note
|
||||
----
|
||||
`length(output) != length(input)`.
|
||||
To correct this: `return y[(window_len/2-1):-(window_len/2)]` instead of just `y`.
|
||||
|
||||
References
|
||||
----------
|
||||
.. [#] http://scipy-cookbook.readthedocs.io/items/SignalSmooth.html
|
||||
|
||||
"""
|
||||
|
||||
if x.ndim != 1:
|
||||
raise ValueError("smooth only accepts 1 dimension arrays.")
|
||||
|
||||
if x.size < window_len:
|
||||
raise ValueError("Input vector needs to be bigger than window size.")
|
||||
|
||||
if window_len < 3:
|
||||
return x
|
||||
|
||||
if window not in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
|
||||
raise ValueError("Window is one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
|
||||
|
||||
s = np.r_[x[window_len - 1:0:-1], x, x[-2:-window_len - 1:-1]]
|
||||
if window == 'flat': # moving average
|
||||
w = np.ones(window_len, 'd')
|
||||
else:
|
||||
w = eval('np.' + window + '(window_len)')
|
||||
y = np.convolve(w / w.sum(), s, mode='same')
|
||||
return y[window_len-1:-(window_len-1)]
|
||||
|
||||
|
||||
class ProgressBar:
|
||||
"""
|
||||
A progress bar based off of the notebook/ipython progress bar from PyMC.
|
||||
|
||||
Useful when waiting for long operations such as taking a large number
|
||||
of VNA measurements that may take a few minutes.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> from time import sleep
|
||||
>>> pb = rf.ProgressBar(10)
|
||||
>>> for idx in range(10):
|
||||
>>> sleep(1)
|
||||
>>> pb.animate(idx)
|
||||
|
||||
"""
|
||||
def __init__(self, iterations: int, label: str = "iterations"):
|
||||
"""
|
||||
Progress bar constructor.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
iterations : int
|
||||
Number of expected iterations
|
||||
label : str, optional
|
||||
Progress bar label, by default "iterations"
|
||||
"""
|
||||
self.iterations = iterations
|
||||
self.label = label
|
||||
self.prog_bar = '[]'
|
||||
self.fill_char = '*'
|
||||
self.width = 50
|
||||
self.__update_amount(0)
|
||||
|
||||
def animate(self, iteration: int):
|
||||
"""
|
||||
Animate the progress bar.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
iteration : int
|
||||
current iteration
|
||||
"""
|
||||
print('\r', self, end='')
|
||||
sys.stdout.flush()
|
||||
self.update_iteration(iteration + 1)
|
||||
|
||||
def update_iteration(self, elapsed_iter: int):
|
||||
self.__update_amount((elapsed_iter / float(self.iterations)) * 100.0)
|
||||
self.prog_bar += ' %d of %s %s complete' % (elapsed_iter, self.iterations, self.label)
|
||||
|
||||
def __update_amount(self, new_amount: int):
|
||||
percent_done = int(round((new_amount / 100.0) * 100.0))
|
||||
all_full = self.width - 2
|
||||
num_hashes = int(round((percent_done / 100.0) * all_full))
|
||||
self.prog_bar = '[' + self.fill_char * num_hashes + ' ' * (all_full - num_hashes) + ']'
|
||||
pct_place = (len(self.prog_bar) // 2) - len(str(percent_done))
|
||||
pct_string = '%d%%' % percent_done
|
||||
self.prog_bar = self.prog_bar[0:pct_place] + \
|
||||
(pct_string + self.prog_bar[pct_place + len(pct_string):])
|
||||
|
||||
def __str__(self):
|
||||
return str(self.prog_bar)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def suppress_numpy_warnings(**kw):
|
||||
olderr = np.seterr(**kw)
|
||||
yield
|
||||
np.seterr(**olderr)
|
||||
|
||||
|
||||
def suppress_warning_decorator(msg):
|
||||
def suppress_warnings_decorated(func):
|
||||
@wraps(func)
|
||||
def suppressed_func(*k, **kw):
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore", message=f"{msg}.*")
|
||||
res = func(*k, **kw)
|
||||
return res
|
||||
return suppressed_func
|
||||
return suppress_warnings_decorated
|
||||
2617
core/vectorFitting.py
Normal file
2617
core/vectorFitting.py
Normal file
File diff suppressed because it is too large
Load Diff
60
vector_fitting.py
Normal file
60
vector_fitting.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import matplotlib.pyplot as plt
|
||||
from skrf import Network
|
||||
from core.freqency import auto_select_multple_ports
|
||||
from core.freqency import auto_select
|
||||
from core.vectorFitting import VectorFitting
|
||||
import numpy as np
|
||||
from typing import Literal
|
||||
import skrf as rf
|
||||
|
||||
def vector_fitting_info(vf:VectorFitting,sampled_freqs,sampled_responses,title:str='vf',parameter_type:Literal['y','s','z']='y'):
|
||||
for i in range(vf.network.nports):
|
||||
for j in range(vf.network.nports):
|
||||
rms_error = vf.get_rms_error(i,j,parameter_type=parameter_type)
|
||||
print(f'RMS Error Port {i+1} to Port {j+1}: {rms_error}')
|
||||
fitted_points = vf.get_model_response(i,j,sampled_freqs)
|
||||
if parameter_type=='y':
|
||||
input_points = vf.network.y[:,i,j]
|
||||
elif parameter_type=='s':
|
||||
input_points = vf.network.s[:,i,j]
|
||||
elif parameter_type=='z':
|
||||
input_points = vf.network.z[:,i,j]
|
||||
plt.figure(figsize=(20,12))
|
||||
plt.suptitle(f'{title} Port {i+1} to Port {j+1}')
|
||||
plt.subplot(2,2,1)
|
||||
plt.title('Magnitude')
|
||||
plt.plot(sampled_freqs,np.abs(sampled_responses[:,i,j]),'o', ms=4, color='red', label='Samples')
|
||||
plt.plot(vf.network.f,np.abs(input_points),'x', ms=4, color='blue', label='Input Samples')
|
||||
plt.plot(sampled_freqs,np.abs(fitted_points),'-', lw=2, color='k', label='Fit')
|
||||
plt.ylabel('Magnitude (dB)')
|
||||
plt.grid()
|
||||
plt.legend()
|
||||
plt.subplot(2,2,2)
|
||||
plt.title('Phase')
|
||||
plt.plot(sampled_freqs,np.angle(sampled_responses[:,i,j]),'o', ms=4, color='red', label='Samples')
|
||||
plt.plot(vf.network.f,np.angle(input_points),'x', ms=4, color='blue', label='Input Samples')
|
||||
plt.plot(sampled_freqs,np.angle(fitted_points),'-', lw=2, color='k', label='Fit')
|
||||
plt.ylabel('Phase (deg)')
|
||||
plt.xlabel('Frequency (GHz)')
|
||||
plt.grid()
|
||||
plt.legend()
|
||||
plt.subplot(2,2,3)
|
||||
plt.title('RMS_Error')
|
||||
plt.plot(rms_error, 'b', label='RMS Error')
|
||||
plt.ylabel('RMS Error (dB)')
|
||||
plt.grid()
|
||||
plt.legend()
|
||||
plt.savefig(f"outputs/{title.replace(' ','_')}_{i+1}_{j+1}.png")
|
||||
|
||||
|
||||
original_network = Network('/tmp/paramer/simulation/3000/3000.s2p')
|
||||
# original_network = rf.data.ring_slot
|
||||
|
||||
H,freqs = auto_select_multple_ports(original_network.y,original_network.f,max_points=20)
|
||||
fitted_network = Network(y=H,f=freqs)
|
||||
vf = VectorFitting(fitted_network)
|
||||
vf.vector_fit(n_poles_cmplx=2,n_poles_real=0,parameter_type='y')
|
||||
# vf.auto_fit(parameter_type='y')
|
||||
|
||||
vector_fitting_info(vf,original_network.f,original_network.y,parameter_type='y')
|
||||
|
||||
Reference in New Issue
Block a user