chore: 分离了sweep和vf部分,vf部分准备写为包
This commit is contained in:
186
core/sample.py
Normal file
186
core/sample.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import numpy as np
|
||||
def _auto_select_indices(H, freq,
|
||||
n_baseline=64,
|
||||
peak_prominence=0.05,
|
||||
peak_window=5,
|
||||
topgrad_q=0.98,
|
||||
max_points=25,
|
||||
ensure_ends=True):
|
||||
"""返回选中的全局索引,避免直接切片导致多端口不对齐。"""
|
||||
H = np.asarray(H).astype(np.complex128).reshape(-1)
|
||||
f = np.asarray(freq).astype(float).reshape(-1)
|
||||
if H.size != f.size:
|
||||
raise ValueError("H and freq must have the same length.")
|
||||
N = f.size
|
||||
if N < 4 or max_points is None or max_points >= N:
|
||||
return np.arange(N, dtype=int)
|
||||
|
||||
eps = 1e-16
|
||||
mag = np.abs(H)
|
||||
logmag = np.log10(mag + eps)
|
||||
phase = np.unwrap(np.angle(H))
|
||||
|
||||
lf = np.log(f) if np.all(f > 0) else f.copy()
|
||||
dlf = np.gradient(lf)
|
||||
d_logmag = np.gradient(logmag) / (dlf + 1e-16)
|
||||
d_phase = np.gradient(phase) / (dlf + 1e-16)
|
||||
|
||||
idx = set()
|
||||
if ensure_ends:
|
||||
idx.update([0, N - 1])
|
||||
|
||||
if n_baseline > 0:
|
||||
grid = np.linspace(lf.min(), lf.max(), n_baseline)
|
||||
base_idx = np.clip(np.searchsorted(lf, grid), 0, N - 1)
|
||||
idx.update(np.unique(base_idx).tolist())
|
||||
|
||||
try:
|
||||
from scipy.signal import find_peaks
|
||||
dyn = logmag.max() - logmag.min()
|
||||
prom = peak_prominence * (dyn + 1e-12)
|
||||
peaks, _ = find_peaks(logmag, prominence=prom)
|
||||
except Exception:
|
||||
peaks = np.where((mag[1:-1] > mag[:-2]) & (mag[1:-1] > mag[2:]))[0] + 1
|
||||
|
||||
for p in np.atleast_1d(peaks):
|
||||
lo = max(0, int(p) - peak_window)
|
||||
hi = min(N, int(p) + peak_window + 1)
|
||||
idx.update(range(lo, hi))
|
||||
|
||||
thr_slope = np.quantile(np.abs(d_logmag), topgrad_q)
|
||||
thr_phase = np.quantile(np.abs(d_phase), topgrad_q)
|
||||
idx.update(np.where(np.abs(d_logmag) >= thr_slope)[0].tolist())
|
||||
idx.update(np.where(np.abs(d_phase) >= thr_phase)[0].tolist())
|
||||
|
||||
sel = np.array(sorted(idx), dtype=int)
|
||||
|
||||
if sel.size > max_points:
|
||||
priority = np.zeros(sel.size, dtype=int)
|
||||
if ensure_ends:
|
||||
priority[(sel == 0) | (sel == N - 1)] = 3
|
||||
if np.size(peaks):
|
||||
mask = np.isin(sel, np.atleast_1d(peaks))
|
||||
priority[mask] = np.maximum(priority[mask], 2)
|
||||
|
||||
keep = []
|
||||
budget = max_points
|
||||
for lev in (3, 2, 1, 0):
|
||||
cand = sel[priority == lev]
|
||||
if cand.size == 0:
|
||||
continue
|
||||
if cand.size <= budget:
|
||||
keep.extend(cand.tolist())
|
||||
budget -= cand.size
|
||||
else:
|
||||
step = max(1, int(np.ceil(cand.size / max(budget, 1))))
|
||||
keep.extend(cand[::step][:budget].tolist())
|
||||
budget = 0
|
||||
if budget == 0:
|
||||
break
|
||||
sel = np.array(sorted(set(keep)), dtype=int)
|
||||
|
||||
if sel.size < max_points:
|
||||
all_idx = set(range(N))
|
||||
missing = list(sorted(all_idx - set(sel)))
|
||||
n_missing = max_points - sel.size
|
||||
if n_missing > 0 and missing:
|
||||
extra = np.linspace(0, len(missing) - 1, n_missing, dtype=int)
|
||||
sel = np.concatenate([sel, np.array(missing)[extra]])
|
||||
sel = np.array(sorted(set(sel)), dtype=int)
|
||||
if sel.size < max_points:
|
||||
left = list(sorted(all_idx - set(sel)))
|
||||
if left:
|
||||
add = min(max_points - sel.size, len(left))
|
||||
sel = np.concatenate([sel, np.random.choice(left, add, replace=False)])
|
||||
sel = np.array(sorted(set(sel)), dtype=int)
|
||||
sel = sel[:max_points]
|
||||
|
||||
return sel
|
||||
|
||||
|
||||
def auto_select(H, freq,
|
||||
n_baseline=64, # log-spaced backbone points
|
||||
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
|
||||
peak_window=5, # take ±peak_window samples around each peak
|
||||
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
|
||||
max_points=25, # final cap on selected samples (None = no cap)
|
||||
ensure_ends=True):
|
||||
sel = _auto_select_indices(H, freq,
|
||||
n_baseline=n_baseline,
|
||||
peak_prominence=peak_prominence,
|
||||
peak_window=peak_window,
|
||||
topgrad_q=topgrad_q,
|
||||
max_points=max_points,
|
||||
ensure_ends=ensure_ends)
|
||||
H = np.asarray(H).astype(np.complex128).reshape(-1)
|
||||
f = np.asarray(freq).astype(float).reshape(-1)
|
||||
return H[sel], f[sel]
|
||||
|
||||
def auto_select_multple_ports(H, freq,
|
||||
n_baseline=64, # log-spaced backbone points
|
||||
peak_prominence=0.05, # fraction of |H| dB dynamic range for peak detection
|
||||
peak_window=5, # take ±peak_window samples around each peak
|
||||
topgrad_q=0.98, # keep top 2% largest slope/phase-change points
|
||||
max_points=25, # final cap on selected samples (None = no cap)
|
||||
ensure_ends=True):
|
||||
"""
|
||||
多端口统一选点:为每个(i,j)先各自产出候选索引,然后做并集并按出现次数优先级裁剪到同一套索引,
|
||||
确保返回的 H_selected 与 freq_selected 全端口严格对齐。
|
||||
输入:
|
||||
H: (N, P, P) 复数频响
|
||||
freq: (N,) 频率
|
||||
返回:
|
||||
H_selected: (K, P, P)
|
||||
freq_selected: (K,)
|
||||
其中 K == max_points(或当样本不足时为 N)。
|
||||
"""
|
||||
H = np.asarray(H)
|
||||
f = np.asarray(freq).astype(float).reshape(-1)
|
||||
if H.ndim != 3:
|
||||
raise ValueError("H must have shape (N, P, P)")
|
||||
N, P1, P2 = H.shape
|
||||
if f.size != N:
|
||||
raise ValueError("H and freq must have the same first dimension length.")
|
||||
if P1 != P2:
|
||||
raise ValueError("H must be square on ports (P x P).")
|
||||
|
||||
# 边界:样本太少或不需裁剪,直接返回全量且对齐
|
||||
if N < 4 or max_points is None or max_points >= N:
|
||||
return H.copy(), f.copy()
|
||||
|
||||
# 每个(i,j)各自选索引
|
||||
counts = {}
|
||||
all_sel_sets = []
|
||||
for i in range(P1):
|
||||
for j in range(P2):
|
||||
sel = _auto_select_indices(H[:, i, j], f,
|
||||
n_baseline=n_baseline,
|
||||
peak_prominence=peak_prominence,
|
||||
peak_window=peak_window,
|
||||
topgrad_q=topgrad_q,
|
||||
max_points=max_points,
|
||||
ensure_ends=ensure_ends)
|
||||
all_sel_sets.append(sel)
|
||||
for idx in sel.tolist():
|
||||
counts[idx] = counts.get(idx, 0) + 1
|
||||
|
||||
# 并集 + 频次优先裁剪到 max_points
|
||||
union_idx = sorted(set(np.concatenate(all_sel_sets)) )
|
||||
|
||||
# 如果并集不超过预算,必要时补点至 max_points(均匀抽取未选样本)
|
||||
if len(union_idx) <= max_points:
|
||||
sel_final = union_idx
|
||||
if len(sel_final) < max_points:
|
||||
remaining = sorted(set(range(N)) - set(sel_final))
|
||||
if remaining:
|
||||
need = max_points - len(sel_final)
|
||||
step = max(1, int(np.ceil(len(remaining) / need)))
|
||||
sel_final.extend(remaining[::step][:need])
|
||||
sel_final = sorted(set(sel_final))[:max_points]
|
||||
else:
|
||||
# 过多则按出现次数从高到低选,出现次数相同按索引位置靠前优先
|
||||
sorted_by_score = sorted(union_idx, key=lambda k: (-counts.get(k, 0), k))
|
||||
sel_final = sorted(sorted_by_score[:max_points])
|
||||
|
||||
sel_final = np.array(sel_final, dtype=int)
|
||||
return H[sel_final], f[sel_final]
|
||||
Reference in New Issue
Block a user