Files
ovf/core/freqency.py
2025-09-22 22:21:43 -04:00

249 lines
9.4 KiB
Python

import numpy as np
# def auto_select(H, freq,
# n_baseline=64, # log-spaced backbone points
# peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
# peak_window=5, # take ±peak_window samples around each peak
# topgrad_q=0.98, # keep top 2% largest slope/phase-change points
# max_points=25, # final cap on selected samples (None = no cap)
# ensure_ends=True):
# """
# Select several significant sample points for vector fitting.
# Strategy:
# 1) Always keep endpoints (optional).
# 2) Add a log-spaced baseline over the band.
# 3) Detect resonance peaks in |H| (on a log scale) and keep small windows around them.
# 4) Add points with the largest magnitude slope and phase-change (w.r.t log-f).
# 5) De-duplicate, sort, and optionally thin to 'max_points' with priority
# to endpoints and detected peaks.
# Parameters
# ----------
# H : (N,) complex array
# Frequency response samples.
# freq : (N,) float array
# Frequency axis [Hz], strictly increasing.
# n_baseline : int
# Count of log-spaced baseline samples across the band.
# peak_prominence : float
# Peak prominence threshold as a fraction of the dynamic range in log|H|.
# 0.05 ≈ keep peaks ≥ 5% of the range.
# peak_window : int
# Number of neighbor indices to include on each side of every detected peak.
# topgrad_q : float in (0,1)
# Quantile for selecting strong slope/phase points.
# 0.98 ⇒ keep the top 2% largest derivatives.
# max_points : int or None
# If not None, cap the total number of selected indices to this value.
# ensure_ends : bool
# Always include the first and last samples.
# Returns
# -------
# H_sel : (K,) complex array
# freq_sel : (K,) float array
# """
# H = np.asarray(H).reshape(-1)
# f = np.asarray(freq).reshape(-1)
# if H.size != f.size:
# raise ValueError("H and freq must have the same length.")
# N = f.size
# if N < 4:
# return H.copy(), f.copy()
# eps = 1e-16
# mag = np.abs(H)
# logmag = np.log10(mag + eps)
# phase = np.unwrap(np.angle(H))
# # log-frequency axis (scale-invariant derivatives)
# # keep it linear if any non-positive freq sneaks in
# if np.all(f > 0):
# lf = np.log(f)
# else:
# lf = f.copy()
# dlf = np.gradient(lf)
# d_logmag = np.gradient(logmag) / (dlf + 1e-16)
# d_phase = np.gradient(phase) / (dlf + 1e-16)
# idx = set()
# if ensure_ends:
# idx.update([0, N-1])
# # 1) log-spaced baseline
# if n_baseline > 0:
# # map a log grid to nearest indices
# grid = np.linspace(lf.min(), lf.max(), n_baseline)
# base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1)
# idx.update(np.unique(base_idx).tolist())
# # 2) peaks in |H|
# try:
# from scipy.signal import find_peaks
# dyn = logmag.max() - logmag.min()
# prom = peak_prominence * (dyn + 1e-12)
# peaks, _ = find_peaks(logmag, prominence=prom)
# except Exception:
# # simple fallback: strict local maxima
# peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
# for p in peaks:
# lo = max(0, p - peak_window)
# hi = min(N, p + peak_window + 1)
# idx.update(range(lo, hi))
# # 3) strongest slope / phase-change points
# thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
# thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
# idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
# idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
# # 4) finalize set
# sel = np.array(sorted(idx), dtype=int)
# # 5) optional thinning with priority to endpoints and peaks
# if max_points is not None and sel.size > max_points:
# priority = np.zeros(sel.size, dtype=int)
# if ensure_ends:
# priority[(sel == 0) | (sel == N-1)] = 3
# if peaks.size:
# priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2)
# keep = []
# budget = max_points
# # keep highest-priority first
# for lev in (3, 2, 1, 0):
# cand = sel[priority == lev]
# if cand.size == 0:
# continue
# if cand.size <= budget:
# keep.extend(cand.tolist())
# budget -= cand.size
# else:
# step = max(1, int(np.ceil(cand.size / budget)))
# keep.extend(cand[::step][:budget].tolist())
# budget = 0
# if budget == 0:
# break
# sel = np.array(sorted(set(keep)), dtype=int)
# return H[sel], f[sel]
def auto_select(H, freq,
n_baseline=64, # log-spaced backbone points
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
peak_window=5, # take ±peak_window samples around each peak
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
max_points=25, # final cap on selected samples (None = no cap)
ensure_ends=True):
H = np.asarray(H).reshape(-1)
f = np.asarray(freq).reshape(-1)
if H.size != f.size:
raise ValueError("H and freq must have the same length.")
N = f.size
if N < 4 or max_points is None or max_points >= N:
# 直接返回所有点
return H.copy(), f.copy()
eps = 1e-16
mag = np.abs(H)
logmag = np.log10(mag + eps)
phase = np.unwrap(np.angle(H))
if np.all(f > 0):
lf = np.log(f)
else:
lf = f.copy()
dlf = np.gradient(lf)
d_logmag = np.gradient(logmag) / (dlf + 1e-16)
d_phase = np.gradient(phase) / (dlf + 1e-16)
idx = set()
if ensure_ends:
idx.update([0, N-1])
if n_baseline > 0:
grid = np.linspace(lf.min(), lf.max(), n_baseline)
base_idx = np.clip(np.searchsorted(lf, grid), 0, N-1)
idx.update(np.unique(base_idx).tolist())
try:
from scipy.signal import find_peaks
dyn = logmag.max() - logmag.min()
prom = peak_prominence * (dyn + 1e-12)
peaks, _ = find_peaks(logmag, prominence=prom)
except Exception:
peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
for p in peaks:
lo = max(0, p - peak_window)
hi = min(N, p + peak_window + 1)
idx.update(range(lo, hi))
thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
sel = np.array(sorted(idx), dtype=int)
if sel.size > max_points:
priority = np.zeros(sel.size, dtype=int)
if ensure_ends:
priority[(sel == 0) | (sel == N-1)] = 3
if peaks.size:
priority[np.isin(sel, peaks)] = np.maximum(priority[np.isin(sel, peaks)], 2)
keep = []
budget = max_points
for lev in (3, 2, 1, 0):
cand = sel[priority == lev]
if cand.size == 0:
continue
if cand.size <= budget:
keep.extend(cand.tolist())
budget -= cand.size
else:
step = max(1, int(np.ceil(cand.size / budget)))
keep.extend(cand[::step][:budget].tolist())
budget = 0
if budget == 0:
break
sel = np.array(sorted(set(keep)), dtype=int)
if sel.size < max_points:
all_idx = set(range(N))
missing = list(sorted(all_idx - set(sel)))
n_missing = max_points - sel.size
if n_missing > 0 and missing:
extra = np.linspace(0, len(missing)-1, n_missing, dtype=int)
sel = np.concatenate([sel, np.array(missing)[extra]])
sel = np.array(sorted(set(sel)), dtype=int)
if sel.size < max_points:
left = list(sorted(all_idx - set(sel)))
if left:
sel = np.concatenate([sel, np.random.choice(left, max_points-sel.size, replace=False)])
sel = np.array(sorted(set(sel)), dtype=int)
sel = sel[:max_points]
return H[sel], f[sel]
def auto_select_multple_ports(H, freq,
n_baseline=64, # log-spaced backbone points
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
peak_window=5, # take ±peak_window samples around each peak
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
max_points=25, # final cap on selected samples (None = no cap)
ensure_ends=True):
ports = H.shape[1]
H_selected = np.zeros((max_points,ports,ports),dtype=complex)
for i in range(ports):
for j in range(ports):
H_selected[:,i,j], freq_selected = auto_select(H[:,i,j], freq,
n_baseline=n_baseline, peak_prominence=peak_prominence,
peak_window=peak_window, topgrad_q=topgrad_q,
max_points=max_points, ensure_ends=ensure_ends)
return H_selected, freq_selected