gain-viz/gain_viz/app.py

634 lines
18 KiB
Python

from flask import Flask, render_template, send_file, request, jsonify
from flask_socketio import SocketIO
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import os
import threading
import time
import serial
import json
import base64
import io
import traceback
import socket
from collections import deque
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
PLOT_PATH = os.path.join(os.getcwd(), "plot.png")
# ----------------- Shared Config -----------------
config = {
"usrp_tx_gain": 60,
"usrp_rx_gain": 30,
"scm_tx_gain": 30,
"scm_rx_gain": 30,
"sample_rate": 23.04e6,
"window_ms": 20,
"center_freq": 3.415e9,
"NFFT": 1024,
"iq_port": 5588,
"streaming": False,
"packets_received": 0,
"iq_bandwidth_mbps": 0.0,
}
config_lock = threading.Lock()
usrp_tx_gain = config["usrp_tx_gain"]
usrp_rx_gain = config["usrp_rx_gain"]
scm_tx_gain = config["scm_tx_gain"]
scm_rx_gain = config["scm_rx_gain"]
plot_thread = None
rx_thread = None
stop_event = threading.Event()
pause_event = threading.Event()
latest_iq_data = None
latest_data_lock = threading.Lock()
iq_buffer = deque(maxlen=1)
iq_buffer_lock = threading.Lock()
udp_sock = None
udp_sock_lock = threading.Lock()
MAX_TIME_PLOT_POINTS = 5000
PLOT_REFRESH_SEC = 0.25
def connect_serial(port, baudrate=115200, timeout=1):
try:
return serial.Serial(
port=port,
baudrate=baudrate,
timeout=timeout,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
)
except serial.SerialException as e:
print(f"Error connecting to {port}: {e}")
return None
def send_command(ser, command):
if ser and ser.is_open:
ser.write(command.encode("utf-8"))
def receive_feedback(ser):
if ser and ser.is_open:
try:
ser.flush()
raw_response = ser.readlines()
if raw_response:
rep = ""
for x in raw_response:
rep += str(x) + " ,"
rep = rep[2:].split("\\r")
return rep[-2]
except serial.SerialTimeoutException:
return ""
return ""
def scm_conf(port, baudrate, rx_cmd, tx_cmd):
ser = connect_serial(port, baudrate)
if ser:
for cmd in [rx_cmd, tx_cmd]:
feedback, attempt = None, 0
while feedback != "OK" and attempt < 5:
send_command(ser, cmd + "\r")
feedback = receive_feedback(ser)
attempt += 1
ser.close()
return True
return False
def gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx):
global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain
scm_change = False
if usrp_tx != usrp_tx_gain:
usrp_tx_gain = usrp_tx
os.system(f"tmux send-keys -t ran 'tx_gain 0 {usrp_tx_gain} ' C-m")
if usrp_rx != usrp_rx_gain:
usrp_rx_gain = usrp_rx
os.system(f"tmux send-keys -t ran 'rx_gain 0 {usrp_rx_gain} ' C-m")
if scm_tx != scm_tx_gain:
scm_tx_gain = scm_tx
scm_change = True
if scm_rx != scm_rx_gain:
scm_rx_gain = scm_rx
scm_change = True
if scm_change:
t_cmd = f"HW:GAIN 0 TX 0 {scm_tx_gain}"
r_cmd = f"HW:GAIN 1 RX 0 {scm_rx_gain}"
scm_conf("/dev/ttyUSB0", 115200, r_cmd, t_cmd)
scm_conf("/dev/ttyUSB1", 115200, r_cmd, t_cmd)
with config_lock:
config["scm_tx_gain"] = scm_tx_gain
config["scm_rx_gain"] = scm_rx_gain
with config_lock:
config["usrp_tx_gain"] = usrp_tx_gain
config["usrp_rx_gain"] = usrp_rx_gain
return True
def parse_iq_payload(payload):
if len(payload) <= 2:
return None
iq_bytes = payload[2:]
usable_len = (len(iq_bytes) // 8) * 8
if usable_len == 0:
return None
return np.frombuffer(iq_bytes[:usable_len], dtype=np.complex64)
def compute_power_db(iq):
if iq is None or len(iq) == 0:
return None
power = np.mean(np.abs(iq) ** 2)
if power <= 0:
return -120.0
return 10 * np.log10(power + 1e-12)
def data_receiver_thread():
global latest_iq_data, udp_sock
with config_lock:
iq_port = config["iq_port"]
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
sock.bind(("0.0.0.0", iq_port))
sock.settimeout(0.1)
with udp_sock_lock:
udp_sock = sock
print(f"Listening for IQ samples on UDP port {iq_port}")
total_packets = 0
total_bytes = 0
last_stat_time = time.time()
while not stop_event.is_set():
try:
payload, addr = sock.recvfrom(65535)
total_packets += 1
total_bytes += len(payload)
iq = parse_iq_payload(payload)
if iq is None or len(iq) == 0:
continue
with latest_data_lock:
latest_iq_data = iq
with iq_buffer_lock:
iq_buffer.extend(iq)
now = time.time()
dt = now - last_stat_time
if dt >= 1.0:
bandwidth_mbps = (total_bytes * 8) / dt / 1e6
with config_lock:
config["packets_received"] = total_packets
config["iq_bandwidth_mbps"] = bandwidth_mbps
total_bytes = 0
last_stat_time = now
except socket.timeout:
continue
except OSError:
break
except Exception as e:
print(f"UDP receive error: {e}")
traceback.print_exc()
time.sleep(0.1)
try:
sock.close()
except Exception:
pass
with udp_sock_lock:
udp_sock = None
print("Data receiver thread stopped")
def generate_spectrum_plot():
while not stop_event.is_set():
if pause_event.is_set():
time.sleep(0.1)
continue
with config_lock:
sample_rate = config["sample_rate"]
window_ms = config["window_ms"]
center_freq = config["center_freq"]
NFFT = config["NFFT"]
streaming = config["streaming"]
if not streaming:
time.sleep(0.1)
continue
with iq_buffer_lock:
current_iq = np.array(iq_buffer, dtype=np.complex64)
if len(current_iq) == 0:
fig, axes = plt.subplots(2, 1, figsize=(14, 7))
fig.patch.set_facecolor('#1a1a2e')
for ax in axes:
ax.set_facecolor('#1a1a2e')
ax.set_xticks([])
ax.set_yticks([])
for spine in ax.spines.values():
spine.set_visible(False)
axes[0].text(0.5, 0.5, "Waiting for IQ samples on UDP port 5588 ...",
ha="center", va="center", transform=axes[0].transAxes,
fontsize=18, color="#00d4ff")
_save_and_emit(fig)
time.sleep(PLOT_REFRESH_SEC)
continue
try:
window_samples = int(sample_rate * window_ms / 1000)
if len(current_iq) > window_samples:
current_iq = current_iq[-window_samples:]
elif len(current_iq) < window_samples:
current_iq = np.pad(current_iq, (window_samples - len(current_iq), 0), mode="constant")
total_duration_s = len(current_iq) / sample_rate
total_duration_ms = total_duration_s * 1000.0
freq_low = center_freq - sample_rate / 2.0
freq_high = center_freq + sample_rate / 2.0
power_db = compute_power_db(current_iq)
if len(current_iq) > MAX_TIME_PLOT_POINTS:
step = max(1, len(current_iq) // MAX_TIME_PLOT_POINTS)
plot_iq = current_iq[::step]
else:
plot_iq = current_iq
times_ms = np.linspace(0, total_duration_ms, len(plot_iq), endpoint=False)
fig, axes = plt.subplots(
2, 1,
figsize=(14, 7),
gridspec_kw={"height_ratios": [1, 1]},
)
fig.patch.set_facecolor('#1a1a2e')
fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.08, hspace=0.32)
ax_time = axes[0]
ax_spec = axes[1]
for ax in axes:
ax.set_facecolor('#0f0f23')
ax.tick_params(colors='#aaa', labelsize=8)
for spine in ax.spines.values():
spine.set_color('#333')
ax_time.plot(times_ms, np.real(plot_iq), label="I", color="#00d4ff", linewidth=0.4, alpha=0.9)
ax_time.plot(times_ms, np.imag(plot_iq), label="Q", color="#ff6b6b", linewidth=0.4, alpha=0.9)
ax_time.set_xlim(0, total_duration_ms)
real_part = np.real(plot_iq)
imag_part = np.imag(plot_iq)
y_min = min(np.min(real_part), np.min(imag_part))
y_max = max(np.max(real_part), np.max(imag_part))
y_pad = max((y_max - y_min) * 0.03, 0.001)
ax_time.set_ylim(y_min - y_pad, y_max + y_pad)
ax_time.margins(x=0, y=0)
ax_time.set_xlabel("Time (ms)", color='#aaa', fontsize=9)
ax_time.set_ylabel("Amplitude", color='#aaa', fontsize=9)
ax_time.set_title(
f"IQ Time Series | Power: {power_db:.1f} dB | Samples: {len(current_iq):,}",
fontsize=10, fontweight="bold", color="#00d4ff", pad=8,
)
ax_time.grid(True, linestyle='--', linewidth=0.3, alpha=0.4, color='#444')
ax_time.legend(loc="upper right", fontsize=7, framealpha=0.6,
facecolor='#1a1a2e', edgecolor='#333', labelcolor='#ccc')
noverlap = min(NFFT - 1, int(NFFT * 0.5))
ax_spec.specgram(
current_iq,
Fs=sample_rate,
Fc=center_freq,
NFFT=NFFT,
noverlap=noverlap,
cmap="twilight",
mode="magnitude",
scale="dB",
)
ax_spec.set_xlim(0, total_duration_s)
ax_spec.set_ylim(freq_low, freq_high)
ax_spec.margins(x=0, y=0)
ax_spec.xaxis.set_major_formatter(
ticker.FuncFormatter(lambda v, _: f"{v * 1e3:.1f}")
)
ax_spec.xaxis.set_minor_locator(ticker.AutoMinorLocator())
ax_spec.yaxis.set_major_formatter(
ticker.FuncFormatter(lambda v, _: f"{v / 1e9:.4f}")
)
ax_spec.set_xlabel("Time (ms)", color='#aaa', fontsize=9)
ax_spec.set_ylabel("Frequency (GHz)", color='#aaa', fontsize=9)
ax_spec.set_title("Spectrogram", fontsize=10, color="#aaa", pad=6)
ax_spec.grid(False)
_save_and_emit(fig)
except Exception as e:
print(f"Plot generation error: {e}")
traceback.print_exc()
time.sleep(PLOT_REFRESH_SEC)
print("Plotting thread stopped")
def _save_and_emit(fig):
buf = io.BytesIO()
fig.savefig(
buf,
format='png',
dpi=100,
facecolor=fig.get_facecolor(),
edgecolor='none',
pad_inches=0.05
)
buf.seek(0)
png_bytes = buf.read()
buf.close()
plt.close(fig)
with open(PLOT_PATH, "wb") as f:
f.write(png_bytes)
try:
socketio.emit('plot_update', {'image': base64.b64encode(png_bytes).decode('utf-8')})
except Exception:
pass
def start_plotting():
global plot_thread, rx_thread, latest_iq_data, iq_buffer
stop_event.clear()
pause_event.clear()
with latest_data_lock:
latest_iq_data = None
with config_lock:
config["streaming"] = True
config["packets_received"] = 0
config["iq_bandwidth_mbps"] = 0.0
max_samples = max(1, int(config["sample_rate"] * config["window_ms"] / 1000))
with iq_buffer_lock:
iq_buffer = deque(maxlen=max_samples)
if rx_thread is None or not rx_thread.is_alive():
rx_thread = threading.Thread(target=data_receiver_thread, daemon=True)
rx_thread.start()
print("UDP receiver thread started")
if plot_thread is None or not plot_thread.is_alive():
plot_thread = threading.Thread(target=generate_spectrum_plot, daemon=True)
plot_thread.start()
print("Plotting thread started")
return True
def stop_plotting():
global plot_thread, rx_thread, udp_sock
with config_lock:
config["streaming"] = False
stop_event.set()
with udp_sock_lock:
if udp_sock is not None:
try:
udp_sock.close()
except Exception:
pass
if rx_thread and rx_thread.is_alive():
rx_thread.join(timeout=2.0)
if plot_thread and plot_thread.is_alive():
plot_thread.join(timeout=3.0)
fig, ax = plt.subplots(figsize=(14, 7))
fig.patch.set_facecolor('#1a1a2e')
ax.set_facecolor('#1a1a2e')
ax.text(0.5, 0.5, "Streaming Stopped\nClick Start to begin",
ha="center", va="center", transform=ax.transAxes,
fontsize=18, color="#00d4ff")
ax.set_xticks([])
ax.set_yticks([])
for spine in ax.spines.values():
spine.set_visible(False)
_save_and_emit(fig)
return True
def pause_plotting():
if pause_event.is_set():
pause_event.clear()
return "resumed"
else:
pause_event.set()
return "paused"
@app.route("/")
def index():
return render_template("index.html")
@app.route("/plot")
def plot():
return send_file(PLOT_PATH, mimetype="image/png")
@app.route("/start_stream", methods=["POST"])
def start_stream():
try:
start_plotting()
return jsonify(status="success", message="Streaming started")
except Exception as e:
return jsonify(status="error", message=str(e)), 500
@app.route("/stop_stream", methods=["POST"])
def stop_stream():
try:
stop_plotting()
return jsonify(status="success", message="Streaming stopped")
except Exception as e:
return jsonify(status="error", message=str(e)), 500
@app.route("/pause_stream", methods=["POST"])
def pause_stream():
try:
result = pause_plotting()
return jsonify(status="success", message=f"Streaming {result}", state=result)
except Exception as e:
return jsonify(status="error", message=str(e)), 500
@app.route("/get_stream_state")
def get_stream_state():
with config_lock:
streaming = config["streaming"]
paused = pause_event.is_set()
if streaming and not paused:
state = "running"
elif streaming and paused:
state = "paused"
else:
state = "stopped"
return jsonify(state=state)
@app.route("/update_params", methods=["POST"])
def update_params():
try:
with config_lock:
cf = request.form.get("center_freq", type=float)
sr = request.form.get("sample_rate", type=float)
nf = request.form.get("fft_size", type=int)
wm = request.form.get("window_ms", type=float)
if cf:
config["center_freq"] = cf
if sr:
config["sample_rate"] = sr
if nf:
config["NFFT"] = nf
if wm:
config["window_ms"] = wm
save_config()
return jsonify(status="success", message="Parameters updated")
except Exception as e:
return jsonify(status="error", message=str(e)), 500
@app.route("/update_gains", methods=["POST"])
def update_gains():
try:
usrp_tx = request.form.get("usrp_tx_gain", type=float) or usrp_tx_gain
usrp_rx = request.form.get("usrp_rx_gain", type=float) or usrp_rx_gain
scm_tx = request.form.get("scm_tx_gain", type=float) or scm_tx_gain
scm_rx = request.form.get("scm_rx_gain", type=float) or scm_rx_gain
ok = gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx)
if ok:
return jsonify(status="success", message="Gains updated")
return jsonify(status="error", message="Failed"), 500
except Exception as e:
return jsonify(status="error", message=str(e)), 500
@app.route("/get_gains")
def get_gains():
return jsonify(
usrp_tx_gain=usrp_tx_gain,
usrp_rx_gain=usrp_rx_gain,
scm_tx_gain=scm_tx_gain,
scm_rx_gain=scm_rx_gain,
)
@app.route("/get_stats")
def get_stats():
with config_lock:
packets_received = config.get("packets_received", 0)
iq_bandwidth_mbps = config.get("iq_bandwidth_mbps", 0.0)
with iq_buffer_lock:
current_iq = np.array(iq_buffer, dtype=np.complex64)
power_db = compute_power_db(current_iq) if len(current_iq) > 0 else None
return jsonify(
current_slot="--",
packets_received=packets_received,
slots_correlated=0,
correlation_rate=0,
slot_rate=0,
iq_bandwidth_mbps=iq_bandwidth_mbps,
metadata_received=0,
slots_without_metadata=0,
avg_packets_per_slot=0,
power_db=round(power_db, 1) if power_db is not None else None,
allocated_rbs=0,
direction="--",
)
def save_config():
with config_lock:
cfg = dict(config)
try:
with open(os.path.join(os.getcwd(), "gain_viz.json"), "w") as f:
json.dump(cfg, f, indent=2, default=str)
except Exception as e:
print(f"Error saving config: {e}")
def main():
if not os.path.exists(PLOT_PATH):
fig, ax = plt.subplots(figsize=(14, 7))
fig.patch.set_facecolor('#1a1a2e')
ax.set_facecolor('#1a1a2e')
ax.text(0.5, 0.5, "Click Start to begin streaming",
ha="center", va="center", fontsize=18, color="#00d4ff")
ax.set_xticks([])
ax.set_yticks([])
for spine in ax.spines.values():
spine.set_visible(False)
fig.savefig(PLOT_PATH, facecolor=fig.get_facecolor())
plt.close(fig)
print("=" * 60)
print(" IQ Spectrum Analyzer (UDP IQ only, optimized)")
print("=" * 60)
socketio.run(
app,
host="0.0.0.0",
port=5000,
debug=True,
use_reloader=False,
allow_unsafe_werkzeug=True
)
if __name__ == "__main__":
main()