Files
ovf/core/freqency.py

187 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
def _auto_select_indices(H, freq,
n_baseline=64,
peak_prominence=0.05,
peak_window=5,
topgrad_q=0.98,
max_points=25,
ensure_ends=True):
"""返回选中的全局索引,避免直接切片导致多端口不对齐。"""
H = np.asarray(H).astype(np.complex128).reshape(-1)
f = np.asarray(freq).astype(float).reshape(-1)
if H.size != f.size:
raise ValueError("H and freq must have the same length.")
N = f.size
if N < 4 or max_points is None or max_points >= N:
return np.arange(N, dtype=int)
eps = 1e-16
mag = np.abs(H)
logmag = np.log10(mag + eps)
phase = np.unwrap(np.angle(H))
lf = np.log(f) if np.all(f > 0) else f.copy()
dlf = np.gradient(lf)
d_logmag = np.gradient(logmag) / (dlf + 1e-16)
d_phase = np.gradient(phase) / (dlf + 1e-16)
idx = set()
if ensure_ends:
idx.update([0, N - 1])
if n_baseline > 0:
grid = np.linspace(lf.min(), lf.max(), n_baseline)
base_idx = np.clip(np.searchsorted(lf, grid), 0, N - 1)
idx.update(np.unique(base_idx).tolist())
try:
from scipy.signal import find_peaks
dyn = logmag.max() - logmag.min()
prom = peak_prominence * (dyn + 1e-12)
peaks, _ = find_peaks(logmag, prominence=prom)
except Exception:
peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
for p in np.atleast_1d(peaks):
lo = max(0, int(p) - peak_window)
hi = min(N, int(p) + peak_window + 1)
idx.update(range(lo, hi))
thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
sel = np.array(sorted(idx), dtype=int)
if sel.size > max_points:
priority = np.zeros(sel.size, dtype=int)
if ensure_ends:
priority[(sel == 0) | (sel == N - 1)] = 3
if np.size(peaks):
mask = np.isin(sel, np.atleast_1d(peaks))
priority[mask] = np.maximum(priority[mask], 2)
keep = []
budget = max_points
for lev in (3, 2, 1, 0):
cand = sel[priority == lev]
if cand.size == 0:
continue
if cand.size <= budget:
keep.extend(cand.tolist())
budget -= cand.size
else:
step = max(1, int(np.ceil(cand.size / max(budget, 1))))
keep.extend(cand[::step][:budget].tolist())
budget = 0
if budget == 0:
break
sel = np.array(sorted(set(keep)), dtype=int)
if sel.size < max_points:
all_idx = set(range(N))
missing = list(sorted(all_idx - set(sel)))
n_missing = max_points - sel.size
if n_missing > 0 and missing:
extra = np.linspace(0, len(missing) - 1, n_missing, dtype=int)
sel = np.concatenate([sel, np.array(missing)[extra]])
sel = np.array(sorted(set(sel)), dtype=int)
if sel.size < max_points:
left = list(sorted(all_idx - set(sel)))
if left:
add = min(max_points - sel.size, len(left))
sel = np.concatenate([sel, np.random.choice(left, add, replace=False)])
sel = np.array(sorted(set(sel)), dtype=int)
sel = sel[:max_points]
return sel
def auto_select(H, freq,
n_baseline=64, # log-spaced backbone points
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
peak_window=5, # take ±peak_window samples around each peak
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
max_points=25, # final cap on selected samples (None = no cap)
ensure_ends=True):
sel = _auto_select_indices(H, freq,
n_baseline=n_baseline,
peak_prominence=peak_prominence,
peak_window=peak_window,
topgrad_q=topgrad_q,
max_points=max_points,
ensure_ends=ensure_ends)
H = np.asarray(H).astype(np.complex128).reshape(-1)
f = np.asarray(freq).astype(float).reshape(-1)
return H[sel], f[sel]
def auto_select_multple_ports(H, freq,
n_baseline=64, # log-spaced backbone points
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
peak_window=5, # take ±peak_window samples around each peak
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
max_points=25, # final cap on selected samples (None = no cap)
ensure_ends=True):
"""
多端口统一选点:为每个(i,j)先各自产出候选索引,然后做并集并按出现次数优先级裁剪到同一套索引,
确保返回的 H_selected 与 freq_selected 全端口严格对齐。
输入:
H: (N, P, P) 复数频响
freq: (N,) 频率
返回:
H_selected: (K, P, P)
freq_selected: (K,)
其中 K == max_points或当样本不足时为 N
"""
H = np.asarray(H)
f = np.asarray(freq).astype(float).reshape(-1)
if H.ndim != 3:
raise ValueError("H must have shape (N, P, P)")
N, P1, P2 = H.shape
if f.size != N:
raise ValueError("H and freq must have the same first dimension length.")
if P1 != P2:
raise ValueError("H must be square on ports (P x P).")
# 边界:样本太少或不需裁剪,直接返回全量且对齐
if N < 4 or max_points is None or max_points >= N:
return H.copy(), f.copy()
# 每个(i,j)各自选索引
counts = {}
all_sel_sets = []
for i in range(P1):
for j in range(P2):
sel = _auto_select_indices(H[:, i, j], f,
n_baseline=n_baseline,
peak_prominence=peak_prominence,
peak_window=peak_window,
topgrad_q=topgrad_q,
max_points=max_points,
ensure_ends=ensure_ends)
all_sel_sets.append(sel)
for idx in sel.tolist():
counts[idx] = counts.get(idx, 0) + 1
# 并集 + 频次优先裁剪到 max_points
union_idx = sorted(set(np.concatenate(all_sel_sets)) )
# 如果并集不超过预算,必要时补点至 max_points均匀抽取未选样本
if len(union_idx) <= max_points:
sel_final = union_idx
if len(sel_final) < max_points:
remaining = sorted(set(range(N)) - set(sel_final))
if remaining:
need = max_points - len(sel_final)
step = max(1, int(np.ceil(len(remaining) / need)))
sel_final.extend(remaining[::step][:need])
sel_final = sorted(set(sel_final))[:max_points]
else:
# 过多则按出现次数从高到低选,出现次数相同按索引位置靠前优先
sorted_by_score = sorted(union_idx, key=lambda k: (-counts.get(k, 0), k))
sel_final = sorted(sorted_by_score[:max_points])
sel_final = np.array(sel_final, dtype=int)
return H[sel_final], f[sel_final]