diff --git a/dist/gain_viz-0.1.0-py3-none-any.whl b/dist/gain_viz-0.1.0-py3-none-any.whl new file mode 100644 index 0000000..b4ef93c Binary files /dev/null and b/dist/gain_viz-0.1.0-py3-none-any.whl differ diff --git a/dist/gain_viz-0.1.0.tar.gz b/dist/gain_viz-0.1.0.tar.gz new file mode 100644 index 0000000..3d32817 Binary files /dev/null and b/dist/gain_viz-0.1.0.tar.gz differ diff --git a/gain_viz.egg-info/PKG-INFO b/gain_viz.egg-info/PKG-INFO new file mode 100644 index 0000000..757c799 --- /dev/null +++ b/gain_viz.egg-info/PKG-INFO @@ -0,0 +1,73 @@ +Metadata-Version: 2.4 +Name: gain_viz +Version: 0.1.0 +Summary: Interactive srsRAN_Project gnb gain control and spectrum visualization tool +Author-email: "Qoherent Inc." +Maintainer-email: Gael Kamga , Ashkan Beigi +Keywords: radio,rf,sdr,software-defined radio,5G,gnb,gNodeB,srsRAN_Project,SCM,SignalCraft Conditioning Mpdule,USRP +Requires-Python: >=3.8 +Description-Content-Type: text/markdown +Requires-Dist: flask +Requires-Dist: matplotlib +Requires-Dist: numpy +Requires-Dist: pyzmq +Requires-Dist: pyserial +Requires-Dist: flask_socketio + +# gain_viz + +![Python](https://img.shields.io/badge/python-3.8%2B-blue) +![Flask](https://img.shields.io/badge/flask-2.x-orange) + +# gain_viz + +**gain_viz** is a Python-based web application for adjusting RF gain settings and visualizing their effect in real-time. It integrates with USRP and SCM devices, providing live IQ time-series and spectrum visualization. + +--- + +## Features + +- Adjust **USRP Tx/Rx gains** and **SCM Tx/Rx gains** from a web interface. +- Live IQ **time-series plot** in milliseconds. +- Live **spectrum visualization** (waterfall / spectrogram). +- Fast refresh for near real-time feedback. +- Responsive and clean web interface built with HTML/CSS/JS. + + + +## Installation + +Make sure you have **Python 3.8+** installed. + +1. Clone the repository: + +```bash +git clone https://riahub.ai/gael/gain-viz.git +cd gain-viz +``` + +2. Build and install + +```bash +pip install --upgrade build +python3 -m build +pip install dist/gain_viz-0.1.0-py3-none-any.whl +export PATH=$PATH:~/.local/bin +source ~/.bashrc + + +``` + +## Usage + +Run the application +```bash +gain_viz +``` + +Open your browser at http://localhost:5000 + + +- Toggle the gain switches to enable input fields. +- Enter new gain values and press Update Gains. +- Observe the effect on the time-domain IQ plot and spectrum. diff --git a/gain_viz.egg-info/SOURCES.txt b/gain_viz.egg-info/SOURCES.txt new file mode 100644 index 0000000..47db683 --- /dev/null +++ b/gain_viz.egg-info/SOURCES.txt @@ -0,0 +1,14 @@ +README.md +pyproject.toml +gain_viz/__init__.py +gain_viz/app.py +gain_viz/iq_metadata_interface.py +gain_viz.egg-info/PKG-INFO +gain_viz.egg-info/SOURCES.txt +gain_viz.egg-info/dependency_links.txt +gain_viz.egg-info/entry_points.txt +gain_viz.egg-info/requires.txt +gain_viz.egg-info/top_level.txt +gain_viz/static/plot.png +gain_viz/templates/ind.html +gain_viz/templates/index.html \ No newline at end of file diff --git a/gain_viz.egg-info/dependency_links.txt b/gain_viz.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/gain_viz.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/gain_viz.egg-info/entry_points.txt b/gain_viz.egg-info/entry_points.txt new file mode 100644 index 0000000..609cae1 --- /dev/null +++ b/gain_viz.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +gain_viz = gain_viz.app:main diff --git a/gain_viz.egg-info/requires.txt b/gain_viz.egg-info/requires.txt new file mode 100644 index 0000000..15fcb3b --- /dev/null +++ b/gain_viz.egg-info/requires.txt @@ -0,0 +1,6 @@ +flask +matplotlib +numpy +pyzmq +pyserial +flask_socketio diff --git a/gain_viz.egg-info/top_level.txt b/gain_viz.egg-info/top_level.txt new file mode 100644 index 0000000..8c59485 --- /dev/null +++ b/gain_viz.egg-info/top_level.txt @@ -0,0 +1 @@ +gain_viz diff --git a/gain_viz.json b/gain_viz.json new file mode 100644 index 0000000..b67266b --- /dev/null +++ b/gain_viz.json @@ -0,0 +1,12 @@ +{ + "usrp_tx_gain": 60.0, + "usrp_rx_gain": 20.0, + "scm_tx_gain": 30.0, + "scm_rx_gain": 20.0, + "sample_rate": 23040000.0, + "window_ms": 20.0, + "center_freq": 3430000000.0, + "NFFT": 1024, + "tcp_port": 5556, + "streaming": true +} \ No newline at end of file diff --git a/gain_viz/__pycache__/iq_metadata_interface.cpython-311.pyc b/gain_viz/__pycache__/iq_metadata_interface.cpython-311.pyc new file mode 100644 index 0000000..af689bf Binary files /dev/null and b/gain_viz/__pycache__/iq_metadata_interface.cpython-311.pyc differ diff --git a/gain_viz/app.py b/gain_viz/app.py index 68bf12a..b246b12 100644 --- a/gain_viz/app.py +++ b/gain_viz/app.py @@ -1,6 +1,8 @@ from flask import Flask, render_template, send_file, request, jsonify -import zmq +from flask_socketio import SocketIO import numpy as np +import matplotlib +matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.ticker as ticker import os @@ -8,8 +10,14 @@ import threading import time import serial import json +import base64 +import io +import traceback +import socket +from collections import deque app = Flask(__name__) +socketio = SocketIO(app, cors_allowed_origins="*") PLOT_PATH = os.path.join(os.getcwd(), "plot.png") # ----------------- Shared Config ----------------- @@ -22,42 +30,55 @@ config = { "window_ms": 20, "center_freq": 3.415e9, "NFFT": 1024, - "tcp_port": 5556, - "streaming": False, # Added streaming state + "iq_port": 5588, + "streaming": False, + "packets_received": 0, + "iq_bandwidth_mbps": 0.0, } config_lock = threading.Lock() -# Global variables usrp_tx_gain = config["usrp_tx_gain"] usrp_rx_gain = config["usrp_rx_gain"] scm_tx_gain = config["scm_tx_gain"] scm_rx_gain = config["scm_rx_gain"] -# Plotting thread control plot_thread = None +rx_thread = None stop_event = threading.Event() pause_event = threading.Event() -# ----------------- Serial / SCM ----------------- +latest_iq_data = None +latest_data_lock = threading.Lock() + +iq_buffer = deque(maxlen=1) +iq_buffer_lock = threading.Lock() + +udp_sock = None +udp_sock_lock = threading.Lock() + +MAX_TIME_PLOT_POINTS = 5000 +PLOT_REFRESH_SEC = 0.25 + + def connect_serial(port, baudrate=115200, timeout=1): - """Connect to a serial port with even parity.""" try: - ser = serial.Serial( + return serial.Serial( port=port, baudrate=baudrate, timeout=timeout, bytesize=serial.EIGHTBITS, parity=serial.PARITY_EVEN, - stopbits=serial.STOPBITS_ONE + stopbits=serial.STOPBITS_ONE, ) - return ser except serial.SerialException as e: print(f"Error connecting to {port}: {e}") return None + def send_command(ser, command): if ser and ser.is_open: - ser.write(command.encode('utf-8')) + ser.write(command.encode("utf-8")) + def receive_feedback(ser): if ser and ser.is_open: @@ -74,13 +95,12 @@ def receive_feedback(ser): return "" return "" + def scm_conf(port, baudrate, rx_cmd, tx_cmd): ser = connect_serial(port, baudrate) - commands = [rx_cmd, tx_cmd] if ser: - for cmd in commands: - feedback = None - attempt = 0 + for cmd in [rx_cmd, tx_cmd]: + feedback, attempt = None, 0 while feedback != "OK" and attempt < 5: send_command(ser, cmd + "\r") feedback = receive_feedback(ser) @@ -89,34 +109,29 @@ def scm_conf(port, baudrate, rx_cmd, tx_cmd): return True return False -# ----------------- Gain Updates ----------------- + def gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx): global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain - scm_change = False + if usrp_tx != usrp_tx_gain: usrp_tx_gain = usrp_tx os.system(f"tmux send-keys -t ran 'tx_gain 0 {usrp_tx_gain} ' C-m") - if usrp_rx != usrp_rx_gain: usrp_rx_gain = usrp_rx os.system(f"tmux send-keys -t ran 'rx_gain 0 {usrp_rx_gain} ' C-m") - if scm_tx != scm_tx_gain: scm_tx_gain = scm_tx scm_change = True - if scm_rx != scm_rx_gain: scm_rx_gain = scm_rx scm_change = True - - t_cmd = f"HW:GAIN 0 TX 0 {scm_tx_gain}" - r_cmd = f"HW:GAIN 1 RX 0 {scm_rx_gain}" if scm_change: + t_cmd = f"HW:GAIN 0 TX 0 {scm_tx_gain}" + r_cmd = f"HW:GAIN 1 RX 0 {scm_rx_gain}" scm_conf("/dev/ttyUSB0", 115200, r_cmd, t_cmd) scm_conf("/dev/ttyUSB1", 115200, r_cmd, t_cmd) - with config_lock: config["scm_tx_gain"] = scm_tx_gain config["scm_rx_gain"] = scm_rx_gain @@ -127,331 +142,493 @@ def gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx): return True -# ----------------- Plot Generation ----------------- -def generate_spectrum_plot(): - socket = None - iq_sample = np.zeros(1, dtype=np.complex64) - last_port = None + +def parse_iq_payload(payload): + if len(payload) <= 2: + return None + + iq_bytes = payload[2:] + usable_len = (len(iq_bytes) // 8) * 8 + if usable_len == 0: + return None + + return np.frombuffer(iq_bytes[:usable_len], dtype=np.complex64) + + +def compute_power_db(iq): + if iq is None or len(iq) == 0: + return None + power = np.mean(np.abs(iq) ** 2) + if power <= 0: + return -120.0 + return 10 * np.log10(power + 1e-12) + + +def data_receiver_thread(): + global latest_iq_data, udp_sock + + with config_lock: + iq_port = config["iq_port"] + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024) + sock.bind(("0.0.0.0", iq_port)) + sock.settimeout(0.1) + + with udp_sock_lock: + udp_sock = sock + + print(f"Listening for IQ samples on UDP port {iq_port}") + + total_packets = 0 + total_bytes = 0 + last_stat_time = time.time() while not stop_event.is_set(): - # Check if we're paused + try: + payload, addr = sock.recvfrom(65535) + total_packets += 1 + total_bytes += len(payload) + + iq = parse_iq_payload(payload) + if iq is None or len(iq) == 0: + continue + + with latest_data_lock: + latest_iq_data = iq + + with iq_buffer_lock: + iq_buffer.extend(iq) + + now = time.time() + dt = now - last_stat_time + if dt >= 1.0: + bandwidth_mbps = (total_bytes * 8) / dt / 1e6 + with config_lock: + config["packets_received"] = total_packets + config["iq_bandwidth_mbps"] = bandwidth_mbps + total_bytes = 0 + last_stat_time = now + + except socket.timeout: + continue + except OSError: + break + except Exception as e: + print(f"UDP receive error: {e}") + traceback.print_exc() + time.sleep(0.1) + + try: + sock.close() + except Exception: + pass + + with udp_sock_lock: + udp_sock = None + + print("Data receiver thread stopped") + + +def generate_spectrum_plot(): + while not stop_event.is_set(): if pause_event.is_set(): time.sleep(0.1) continue - + with config_lock: sample_rate = config["sample_rate"] window_ms = config["window_ms"] center_freq = config["center_freq"] NFFT = config["NFFT"] - tcp_port = config["tcp_port"] streaming = config["streaming"] - # Only process if streaming is active if not streaming: time.sleep(0.1) continue - # Reconnect if port changed or socket is None - if socket is None or tcp_port != last_port: - if socket: - socket.close() - try: - context = zmq.Context() - socket = context.socket(zmq.SUB) - socket.setsockopt(zmq.CONFLATE, 1) - socket.setsockopt_string(zmq.SUBSCRIBE, "") - socket.setsockopt(zmq.RCVTIMEO, 1000) - socket.connect(f"tcp://localhost:{tcp_port}") - last_port = tcp_port - print(f"Connected to ZMQ on port {tcp_port}") - except Exception as e: - print(f"ZMQ connection error: {e}") - socket = None - time.sleep(1) - continue + with iq_buffer_lock: + current_iq = np.array(iq_buffer, dtype=np.complex64) - window_samples = int(sample_rate * window_ms / 1000) - if iq_sample.size != window_samples: - iq_sample = np.zeros(window_samples, dtype=np.complex64) + if len(current_iq) == 0: + fig, axes = plt.subplots(2, 1, figsize=(14, 7)) + fig.patch.set_facecolor('#1a1a2e') + for ax in axes: + ax.set_facecolor('#1a1a2e') + ax.set_xticks([]) + ax.set_yticks([]) + for spine in ax.spines.values(): + spine.set_visible(False) + axes[0].text(0.5, 0.5, "Waiting for IQ samples on UDP port 5588 ...", + ha="center", va="center", transform=axes[0].transAxes, + fontsize=18, color="#00d4ff") + _save_and_emit(fig) + time.sleep(PLOT_REFRESH_SEC) + continue try: - msg = socket.recv(zmq.NOBLOCK) - float_data = np.frombuffer(msg, dtype=np.float32) - if float_data.size >= 2: - complex_data = float_data.reshape(-1, 2) - iq_all = complex_data[:, 0] + 1j * complex_data[:, 1] - if len(iq_all) >= window_samples: - iq_sample = iq_all[-window_samples:] - else: - iq_sample = np.pad(iq_all, (window_samples - len(iq_all), 0)) + window_samples = int(sample_rate * window_ms / 1000) + if len(current_iq) > window_samples: + current_iq = current_iq[-window_samples:] + elif len(current_iq) < window_samples: + current_iq = np.pad(current_iq, (window_samples - len(current_iq), 0), mode="constant") - # Create plot - fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6)) - fig.subplots_adjust(hspace=0.4) + total_duration_s = len(current_iq) / sample_rate + total_duration_ms = total_duration_s * 1000.0 + freq_low = center_freq - sample_rate / 2.0 + freq_high = center_freq + sample_rate / 2.0 + power_db = compute_power_db(current_iq) - # Time-domain plot - times_ms = np.arange(len(iq_sample)) * 1000 / sample_rate - ax1.plot(times_ms, np.real(iq_sample), label="Real", color='b') - ax1.plot(times_ms, np.imag(iq_sample), label="Imag", color='r') - ax1.set_xlim(0, window_ms) - ax1.set_xlabel("Time (ms)") - ax1.set_ylabel("IQ Amplitude") - ax1.grid(True, which='both', linestyle='--', linewidth=0.5) - ax1.legend() + if len(current_iq) > MAX_TIME_PLOT_POINTS: + step = max(1, len(current_iq) // MAX_TIME_PLOT_POINTS) + plot_iq = current_iq[::step] + else: + plot_iq = current_iq - # Spectrogram - cmap = plt.get_cmap('twilight') - ax2.specgram( - iq_sample, + times_ms = np.linspace(0, total_duration_ms, len(plot_iq), endpoint=False) + + fig, axes = plt.subplots( + 2, 1, + figsize=(14, 7), + gridspec_kw={"height_ratios": [1, 1]}, + ) + fig.patch.set_facecolor('#1a1a2e') + fig.subplots_adjust(left=0.07, right=0.98, top=0.94, bottom=0.08, hspace=0.32) + + ax_time = axes[0] + ax_spec = axes[1] + + for ax in axes: + ax.set_facecolor('#0f0f23') + ax.tick_params(colors='#aaa', labelsize=8) + for spine in ax.spines.values(): + spine.set_color('#333') + + ax_time.plot(times_ms, np.real(plot_iq), label="I", color="#00d4ff", linewidth=0.4, alpha=0.9) + ax_time.plot(times_ms, np.imag(plot_iq), label="Q", color="#ff6b6b", linewidth=0.4, alpha=0.9) + + ax_time.set_xlim(0, total_duration_ms) + + real_part = np.real(plot_iq) + imag_part = np.imag(plot_iq) + y_min = min(np.min(real_part), np.min(imag_part)) + y_max = max(np.max(real_part), np.max(imag_part)) + y_pad = max((y_max - y_min) * 0.03, 0.001) + ax_time.set_ylim(y_min - y_pad, y_max + y_pad) + ax_time.margins(x=0, y=0) + + ax_time.set_xlabel("Time (ms)", color='#aaa', fontsize=9) + ax_time.set_ylabel("Amplitude", color='#aaa', fontsize=9) + ax_time.set_title( + f"IQ Time Series | Power: {power_db:.1f} dB | Samples: {len(current_iq):,}", + fontsize=10, fontweight="bold", color="#00d4ff", pad=8, + ) + ax_time.grid(True, linestyle='--', linewidth=0.3, alpha=0.4, color='#444') + ax_time.legend(loc="upper right", fontsize=7, framealpha=0.6, + facecolor='#1a1a2e', edgecolor='#333', labelcolor='#ccc') + + noverlap = min(NFFT - 1, int(NFFT * 0.5)) + + ax_spec.specgram( + current_iq, Fs=sample_rate, Fc=center_freq, NFFT=NFFT, - noverlap=512, - cmap=cmap + noverlap=noverlap, + cmap="twilight", + mode="magnitude", + scale="dB", ) - ax2.set_xlabel("Time (ms)") - ax2.set_ylabel("Frequency (Hz)") - ax2.grid(False) - ax2.set_ylim(center_freq - sample_rate / 2, - center_freq + sample_rate / 2) - ax2.xaxis.set_major_formatter( - ticker.FuncFormatter(lambda t, pos: '{0:g}'.format(t*1e3)) + + ax_spec.set_xlim(0, total_duration_s) + ax_spec.set_ylim(freq_low, freq_high) + ax_spec.margins(x=0, y=0) + + ax_spec.xaxis.set_major_formatter( + ticker.FuncFormatter(lambda v, _: f"{v * 1e3:.1f}") + ) + ax_spec.xaxis.set_minor_locator(ticker.AutoMinorLocator()) + ax_spec.yaxis.set_major_formatter( + ticker.FuncFormatter(lambda v, _: f"{v / 1e9:.4f}") ) - ax2.xaxis.set_minor_locator(ticker.AutoMinorLocator()) - plt.savefig(PLOT_PATH, bbox_inches='tight') - plt.close(fig) + ax_spec.set_xlabel("Time (ms)", color='#aaa', fontsize=9) + ax_spec.set_ylabel("Frequency (GHz)", color='#aaa', fontsize=9) + ax_spec.set_title("Spectrogram", fontsize=10, color="#aaa", pad=6) + ax_spec.grid(False) + + _save_and_emit(fig) - except zmq.Again: - # No new data - fig, ax = plt.subplots(figsize=(12, 6)) - ax.text(0.5, 0.5, "Waiting for data...", - ha='center', va='center', transform=ax.transAxes, fontsize=16) - ax.set_title("Spectrum Analyzer - No Data (Streaming Active)") - plt.savefig(PLOT_PATH, bbox_inches='tight') - plt.close(fig) except Exception as e: print(f"Plot generation error: {e}") - fig, ax = plt.subplots(figsize=(12, 6)) - ax.text(0.5, 0.5, f"Error: {str(e)}", - ha='center', va='center', transform=ax.transAxes, fontsize=12) - ax.set_title("Spectrum Analyzer - Error") - plt.savefig(PLOT_PATH, bbox_inches='tight') - plt.close(fig) + traceback.print_exc() - time.sleep(0.1) + time.sleep(PLOT_REFRESH_SEC) - # Cleanup when stopping - if socket: - socket.close() print("Plotting thread stopped") + +def _save_and_emit(fig): + buf = io.BytesIO() + fig.savefig( + buf, + format='png', + dpi=100, + facecolor=fig.get_facecolor(), + edgecolor='none', + pad_inches=0.05 + ) + buf.seek(0) + png_bytes = buf.read() + buf.close() + plt.close(fig) + + with open(PLOT_PATH, "wb") as f: + f.write(png_bytes) + + try: + socketio.emit('plot_update', {'image': base64.b64encode(png_bytes).decode('utf-8')}) + except Exception: + pass + + def start_plotting(): - """Start the plotting thread""" - global plot_thread, stop_event, pause_event - + global plot_thread, rx_thread, latest_iq_data, iq_buffer + stop_event.clear() pause_event.clear() - + + with latest_data_lock: + latest_iq_data = None + with config_lock: config["streaming"] = True - + config["packets_received"] = 0 + config["iq_bandwidth_mbps"] = 0.0 + max_samples = max(1, int(config["sample_rate"] * config["window_ms"] / 1000)) + + with iq_buffer_lock: + iq_buffer = deque(maxlen=max_samples) + + if rx_thread is None or not rx_thread.is_alive(): + rx_thread = threading.Thread(target=data_receiver_thread, daemon=True) + rx_thread.start() + print("UDP receiver thread started") + if plot_thread is None or not plot_thread.is_alive(): plot_thread = threading.Thread(target=generate_spectrum_plot, daemon=True) plot_thread.start() print("Plotting thread started") - + return True + def stop_plotting(): - """Stop the plotting thread""" - global plot_thread, stop_event - + global plot_thread, rx_thread, udp_sock + with config_lock: config["streaming"] = False - + stop_event.set() - + + with udp_sock_lock: + if udp_sock is not None: + try: + udp_sock.close() + except Exception: + pass + + if rx_thread and rx_thread.is_alive(): + rx_thread.join(timeout=2.0) + if plot_thread and plot_thread.is_alive(): - plot_thread.join(timeout=2.0) - - # Create stopped message plot - fig, ax = plt.subplots(figsize=(12, 6)) - ax.text(0.5, 0.5, "Streaming Stopped\nClick Start to begin", - ha='center', va='center', transform=ax.transAxes, fontsize=16) - ax.set_title("Spectrum Analyzer - Stopped") - plt.savefig(PLOT_PATH, bbox_inches='tight') - plt.close(fig) - - print("Plotting thread stopped") + plot_thread.join(timeout=3.0) + + fig, ax = plt.subplots(figsize=(14, 7)) + fig.patch.set_facecolor('#1a1a2e') + ax.set_facecolor('#1a1a2e') + ax.text(0.5, 0.5, "Streaming Stopped\nClick Start to begin", + ha="center", va="center", transform=ax.transAxes, + fontsize=18, color="#00d4ff") + ax.set_xticks([]) + ax.set_yticks([]) + for spine in ax.spines.values(): + spine.set_visible(False) + _save_and_emit(fig) return True + def pause_plotting(): - """Pause the plotting updates""" - global pause_event - if pause_event.is_set(): pause_event.clear() - print("Plotting resumed") return "resumed" else: pause_event.set() - print("Plotting paused") return "paused" -# ----------------- Flask Routes ----------------- -@app.route('/') + +@app.route("/") def index(): - return render_template('index.html') + return render_template("index.html") -@app.route('/update_gains', methods=['POST']) -def update_gains(): - global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain - - try: - usrp_tx = request.form.get('usrp_tx_gain', type=float) - usrp_rx = request.form.get('usrp_rx_gain', type=float) - scm_tx = request.form.get('scm_tx_gain', type=float) - scm_rx = request.form.get('scm_rx_gain', type=float) - - if usrp_tx is None: - usrp_tx = usrp_tx_gain - if usrp_rx is None: - usrp_rx = usrp_rx_gain - if scm_tx is None: - scm_tx = scm_tx_gain - if scm_rx is None: - scm_rx = scm_rx_gain - success = gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx) - if success: - return jsonify({"status": "success", "message": "Gains updated successfully"}) - else: - return jsonify({"status": "error", "message": "Failed to update gains"}), 500 - - except Exception as e: - return jsonify({"status": "error", "message": f"Error: {str(e)}"}), 500 - -@app.route('/plot') +@app.route("/plot") def plot(): - try: - return send_file(PLOT_PATH, mimetype='image/png') - except Exception as e: - return send_file(PLOT_PATH, mimetype='image/png') + return send_file(PLOT_PATH, mimetype="image/png") -@app.route('/get_gains') -def get_gains(): - return jsonify({ - "usrp_tx_gain": usrp_tx_gain, - "usrp_rx_gain": usrp_rx_gain, - "scm_tx_gain": scm_tx_gain, - "scm_rx_gain": scm_rx_gain - }) -@app.route('/update_params', methods=['POST']) -def update_params(): - try: - center_freq = request.form.get('center_freq', type=float) - sample_rate = request.form.get('sample_rate', type=float) - NFFT = request.form.get('fft_size', type=int) - window_ms = request.form.get('window_ms', type=float) - tcp_port = request.form.get('tcp_port', type=int) - - if not all([center_freq, sample_rate, NFFT, window_ms, tcp_port]): - return jsonify({ - 'status': 'error', - 'message': 'All parameters are required' - }), 400 - - with config_lock: - config["center_freq"] = center_freq - config["sample_rate"] = sample_rate - config["NFFT"] = NFFT - config["window_ms"] = window_ms - config["tcp_port"] = tcp_port - - print(f"Updated params: center_freq={center_freq}, sample_rate={sample_rate}, NFFT={NFFT}, window_ms={window_ms}, tcp_port={tcp_port}") - - save_config() - - return jsonify({ - 'status': 'success', - 'message': 'Parameters updated successfully' - }) - except Exception as e: - print(f"Error updating params: {e}") - return jsonify({ - 'status': 'error', - 'message': str(e) - }), 500 - -@app.route('/start_stream', methods=['POST']) +@app.route("/start_stream", methods=["POST"]) def start_stream(): try: - success = start_plotting() - if success: - return jsonify({"status": "success", "message": "Streaming started"}) - else: - return jsonify({"status": "error", "message": "Failed to start streaming"}), 500 + start_plotting() + return jsonify(status="success", message="Streaming started") except Exception as e: - return jsonify({"status": "error", "message": f"Error: {str(e)}"}), 500 + return jsonify(status="error", message=str(e)), 500 -@app.route('/stop_stream', methods=['POST']) + +@app.route("/stop_stream", methods=["POST"]) def stop_stream(): try: - success = stop_plotting() - if success: - return jsonify({"status": "success", "message": "Streaming stopped"}) - else: - return jsonify({"status": "error", "message": "Failed to stop streaming"}), 500 + stop_plotting() + return jsonify(status="success", message="Streaming stopped") except Exception as e: - return jsonify({"status": "error", "message": f"Error: {str(e)}"}), 500 + return jsonify(status="error", message=str(e)), 500 -@app.route('/pause_stream', methods=['POST']) + +@app.route("/pause_stream", methods=["POST"]) def pause_stream(): try: result = pause_plotting() - return jsonify({"status": "success", "message": f"Streaming {result}", "state": result}) + return jsonify(status="success", message=f"Streaming {result}", state=result) except Exception as e: - return jsonify({"status": "error", "message": f"Error: {str(e)}"}), 500 + return jsonify(status="error", message=str(e)), 500 -@app.route('/get_stream_state', methods=['GET']) + +@app.route("/get_stream_state") def get_stream_state(): with config_lock: streaming = config["streaming"] paused = pause_event.is_set() - - state = "stopped" if streaming and not paused: state = "running" elif streaming and paused: state = "paused" - - return jsonify({"state": state}) + else: + state = "stopped" + return jsonify(state=state) + + +@app.route("/update_params", methods=["POST"]) +def update_params(): + try: + with config_lock: + cf = request.form.get("center_freq", type=float) + sr = request.form.get("sample_rate", type=float) + nf = request.form.get("fft_size", type=int) + wm = request.form.get("window_ms", type=float) + if cf: + config["center_freq"] = cf + if sr: + config["sample_rate"] = sr + if nf: + config["NFFT"] = nf + if wm: + config["window_ms"] = wm + save_config() + return jsonify(status="success", message="Parameters updated") + except Exception as e: + return jsonify(status="error", message=str(e)), 500 + + +@app.route("/update_gains", methods=["POST"]) +def update_gains(): + try: + usrp_tx = request.form.get("usrp_tx_gain", type=float) or usrp_tx_gain + usrp_rx = request.form.get("usrp_rx_gain", type=float) or usrp_rx_gain + scm_tx = request.form.get("scm_tx_gain", type=float) or scm_tx_gain + scm_rx = request.form.get("scm_rx_gain", type=float) or scm_rx_gain + ok = gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx) + if ok: + return jsonify(status="success", message="Gains updated") + return jsonify(status="error", message="Failed"), 500 + except Exception as e: + return jsonify(status="error", message=str(e)), 500 + + +@app.route("/get_gains") +def get_gains(): + return jsonify( + usrp_tx_gain=usrp_tx_gain, + usrp_rx_gain=usrp_rx_gain, + scm_tx_gain=scm_tx_gain, + scm_rx_gain=scm_rx_gain, + ) + + +@app.route("/get_stats") +def get_stats(): + with config_lock: + packets_received = config.get("packets_received", 0) + iq_bandwidth_mbps = config.get("iq_bandwidth_mbps", 0.0) + + with iq_buffer_lock: + current_iq = np.array(iq_buffer, dtype=np.complex64) + + power_db = compute_power_db(current_iq) if len(current_iq) > 0 else None + + return jsonify( + current_slot="--", + packets_received=packets_received, + slots_correlated=0, + correlation_rate=0, + slot_rate=0, + iq_bandwidth_mbps=iq_bandwidth_mbps, + metadata_received=0, + slots_without_metadata=0, + avg_packets_per_slot=0, + power_db=round(power_db, 1) if power_db is not None else None, + allocated_rbs=0, + direction="--", + ) + def save_config(): with config_lock: cfg = dict(config) - try: - with open(os.path.join(os.getcwd(), "gain_viz.json"), 'w') as f: - json.dump(cfg, f, indent=2) + with open(os.path.join(os.getcwd(), "gain_viz.json"), "w") as f: + json.dump(cfg, f, indent=2, default=str) except Exception as e: print(f"Error saving config: {e}") -# ----------------- Main ----------------- + def main(): - # Ensure placeholder image exists if not os.path.exists(PLOT_PATH): - fig, ax = plt.subplots(figsize=(12, 6)) - ax.text(0.5, 0.5, "Click Start to begin streaming", ha='center', va='center', fontsize=16) - ax.set_title("Gain-Viz Spectrum Analyzer - Ready") - plt.savefig(PLOT_PATH) + fig, ax = plt.subplots(figsize=(14, 7)) + fig.patch.set_facecolor('#1a1a2e') + ax.set_facecolor('#1a1a2e') + ax.text(0.5, 0.5, "Click Start to begin streaming", + ha="center", va="center", fontsize=18, color="#00d4ff") + ax.set_xticks([]) + ax.set_yticks([]) + for spine in ax.spines.values(): + spine.set_visible(False) + fig.savefig(PLOT_PATH, facecolor=fig.get_facecolor()) plt.close(fig) - print("Gain-Viz server started. Use the web interface to control streaming.") - app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=False) + print("=" * 60) + print(" IQ Spectrum Analyzer (UDP IQ only, optimized)") + print("=" * 60) -if __name__ == '__main__': + socketio.run( + app, + host="0.0.0.0", + port=5000, + debug=True, + use_reloader=False, + allow_unsafe_werkzeug=True + ) + + +if __name__ == "__main__": main() \ No newline at end of file diff --git a/gain_viz/gain_viz.json b/gain_viz/gain_viz.json new file mode 100644 index 0000000..1234d3b --- /dev/null +++ b/gain_viz/gain_viz.json @@ -0,0 +1,16 @@ +{ + "usrp_tx_gain": 60, + "usrp_rx_gain": 30, + "scm_tx_gain": 30, + "scm_rx_gain": 30, + "sample_rate": 23040000.0, + "window_ms": 5.0, + "center_freq": 3415000000.0, + "NFFT": 512, + "iq_port": 5588, + "metadata_port": 5589, + "streaming": true, + "show_rb_overlay": true, + "num_rbs": 273, + "subcarriers_per_rb": 12 +} \ No newline at end of file diff --git a/gain_viz/iq_metadata_interface.py b/gain_viz/iq_metadata_interface.py new file mode 100644 index 0000000..73f3bad --- /dev/null +++ b/gain_viz/iq_metadata_interface.py @@ -0,0 +1,1066 @@ +#!/usr/bin/env python3 +""" +IQ & Metadata Streaming Interface (IQ-Prioritized with Accumulation) + +Handles multiple IQ packets per slot by accumulating them before correlation. +""" + +import logging +import queue +import socket +import struct +import threading +import time +from collections import OrderedDict +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple + +import numpy as np + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("IQInterface") + + +# ============================================================================= +# Data Classes +# ============================================================================= + + +@dataclass +class Metadata: + """RB allocation metadata from scheduler""" + + slot_num: int + rb_bitmap: bytes + dl_flag: bool = True + timestamp: float = field(default_factory=time.time) + + @property + def rb_bitmap_binary(self) -> str: + return "".join(format(b, "08b") for b in self.rb_bitmap) + + @property + def rb_bitmap_array(self) -> np.ndarray: + bits = [] + for byte in self.rb_bitmap: + for i in range(8): + bits.append((byte >> (7 - i)) & 1) + return np.array(bits, dtype=np.uint8) + + @property + def allocated_rbs(self) -> List[int]: + return [i for i, bit in enumerate(self.rb_bitmap_array) if bit == 1] + + @property + def num_allocated_rbs(self) -> int: + return sum(bin(b).count("1") for b in self.rb_bitmap) + + def to_dict(self) -> Dict[str, Any]: + return { + "slot_num": self.slot_num, + "rb_bitmap_hex": self.rb_bitmap.hex(), + "rb_bitmap_binary": self.rb_bitmap_binary, + "num_allocated_rbs": self.num_allocated_rbs, + "timestamp": self.timestamp, + } + + +@dataclass +class IQSamples: + """IQ samples from radio""" + + slot_num: int + samples: np.ndarray + timestamp: float = field(default_factory=time.time) + + @property + def num_samples(self) -> int: + return len(self.samples) + + @property + def i_samples(self) -> np.ndarray: + return np.real(self.samples) + + @property + def q_samples(self) -> np.ndarray: + return np.imag(self.samples) + + @property + def magnitude(self) -> np.ndarray: + return np.abs(self.samples) + + @property + def phase(self) -> np.ndarray: + return np.angle(self.samples) + + @property + def power_db(self) -> float: + return 10 * np.log10(np.mean(np.abs(self.samples) ** 2) + 1e-10) + + def to_dict(self) -> Dict[str, Any]: + return { + "slot_num": self.slot_num, + "num_samples": self.num_samples, + "power_db": self.power_db, + "timestamp": self.timestamp, + } + + +@dataclass +class CorrelatedData: + """Correlated IQ samples and metadata for the same slot""" + + slot_num: int + metadata: Metadata + iq: IQSamples + correlation_timestamp: float = field(default_factory=time.time) + + @property + def samples(self) -> np.ndarray: + return self.iq.samples + + @property + def rb_bitmap(self) -> bytes: + return self.metadata.rb_bitmap + + def to_dict(self) -> Dict[str, Any]: + return { + "slot_num": self.slot_num, + "metadata": self.metadata.to_dict(), + "iq": self.iq.to_dict(), + "correlation_timestamp": self.correlation_timestamp, + } + + +# ============================================================================= +# Statistics +# ============================================================================= + + +@dataclass +class InterfaceStats: + """Interface statistics""" + + start_time: float = field(default_factory=time.time) + + # Reception counts + metadata_received: int = 0 + metadata_bytes: int = 0 + iq_packets_received: int = 0 + iq_samples_received: int = 0 + iq_bytes: int = 0 + + # Correlation stats + slots_correlated: int = 0 + iq_packets_correlated: int = 0 + slots_without_metadata: int = 0 + metadata_expired: int = 0 + + # Buffer stats + metadata_buffer_hits: int = 0 + metadata_buffer_misses: int = 0 + + # Callback stats + callbacks_invoked: int = 0 + callback_errors: int = 0 + + def reset(self) -> None: + """Reset all statistics to zero""" + self.start_time = time.time() + self.metadata_received = 0 + self.metadata_bytes = 0 + self.iq_packets_received = 0 + self.iq_samples_received = 0 + self.iq_bytes = 0 + self.slots_correlated = 0 + self.iq_packets_correlated = 0 + self.slots_without_metadata = 0 + self.metadata_expired = 0 + self.metadata_buffer_hits = 0 + self.metadata_buffer_misses = 0 + self.callbacks_invoked = 0 + self.callback_errors = 0 + + def snapshot(self) -> "InterfaceStats": + """Create a copy of current stats""" + return InterfaceStats( + start_time=self.start_time, + metadata_received=self.metadata_received, + metadata_bytes=self.metadata_bytes, + iq_packets_received=self.iq_packets_received, + iq_samples_received=self.iq_samples_received, + iq_bytes=self.iq_bytes, + slots_correlated=self.slots_correlated, + iq_packets_correlated=self.iq_packets_correlated, + slots_without_metadata=self.slots_without_metadata, + metadata_expired=self.metadata_expired, + metadata_buffer_hits=self.metadata_buffer_hits, + metadata_buffer_misses=self.metadata_buffer_misses, + callbacks_invoked=self.callbacks_invoked, + callback_errors=self.callback_errors, + ) + + @property + def runtime(self) -> float: + return time.time() - self.start_time + + @property + def metadata_rate(self) -> float: + return self.metadata_received / max(self.runtime, 0.001) + + @property + def iq_packet_rate(self) -> float: + return self.iq_packets_received / max(self.runtime, 0.001) + + @property + def slot_rate(self) -> float: + return self.slots_correlated / max(self.runtime, 0.001) + + @property + def iq_bandwidth_mbps(self) -> float: + return (self.iq_bytes / max(self.runtime, 0.001)) / 1e6 + + @property + def correlation_rate(self) -> float: + """Percentage of completed slots that found metadata""" + total_slots = self.slots_correlated + self.slots_without_metadata + if total_slots == 0: + return 0.0 + return self.slots_correlated / total_slots * 100 + + @property + def avg_packets_per_slot(self) -> float: + """Average IQ packets per correlated slot""" + if self.slots_correlated == 0: + return 0.0 + return self.iq_packets_correlated / self.slots_correlated + + def to_dict(self) -> Dict[str, Any]: + return { + "runtime_s": self.runtime, + "metadata_received": self.metadata_received, + "metadata_rate": self.metadata_rate, + "iq_packets_received": self.iq_packets_received, + "iq_packet_rate": self.iq_packet_rate, + "iq_bandwidth_mbps": self.iq_bandwidth_mbps, + "slots_correlated": self.slots_correlated, + "slot_rate": self.slot_rate, + "correlation_rate_pct": self.correlation_rate, + "avg_packets_per_slot": self.avg_packets_per_slot, + "slots_without_metadata": self.slots_without_metadata, + "metadata_expired": self.metadata_expired, + } + + def __str__(self) -> str: + return ( + f"Runtime: {self.runtime:.1f}s | " + f"Meta: {self.metadata_received} ({self.metadata_rate:.1f}/s) | " + f"IQ pkts: {self.iq_packets_received} ({self.iq_packet_rate:.1f}/s) | " + f"Slots: {self.slots_correlated} ({self.correlation_rate:.1f}%) | " + f"Avg pkts/slot: {self.avg_packets_per_slot:.1f}" + ) + + +# ============================================================================= +# Metadata Buffer +# ============================================================================= + + +class MetadataBuffer: + """Buffer for metadata, indexed by slot number""" + + def __init__(self, max_slots: int = 1000, slot_window: int = 500): + # Key is (slot_num, dl_flag) so that flexible slots can store both + # a DL and a UL metadata entry under the same slot number. + self._buffer: OrderedDict[Tuple[int, bool], Metadata] = OrderedDict() + self._max_slots = max_slots + self._slot_window = slot_window + self._lock = threading.Lock() + self._latest_slot = 0 + + def store(self, metadata: Metadata) -> None: + """Store metadata in buffer""" + with self._lock: + key = (metadata.slot_num, metadata.dl_flag) + self._buffer[key] = metadata + + if self._is_newer_slot(metadata.slot_num, self._latest_slot): + self._latest_slot = metadata.slot_num + + while len(self._buffer) > self._max_slots: + self._buffer.popitem(last=False) + + def lookup(self, slot_num: int) -> Optional[Metadata]: + """Look up first metadata entry for a slot (does NOT remove it)""" + with self._lock: + for (sn, _), meta in self._buffer.items(): + if sn == slot_num: + return meta + return None + + def remove(self, slot_num: int) -> Optional[Metadata]: + """Remove first metadata entry for a slot (backward compat)""" + with self._lock: + for key in list(self._buffer.keys()): + if key[0] == slot_num: + return self._buffer.pop(key) + return None + + def remove_all(self, slot_num: int) -> List[Metadata]: + """Remove ALL metadata entries for a slot (handles flexible slots)""" + with self._lock: + keys = [k for k in self._buffer if k[0] == slot_num] + return [self._buffer.pop(k) for k in keys] + + def cleanup_old( + self, current_slot: int, expired_callback: Optional[Callable] = None + ) -> int: + """Remove metadata older than slot_window""" + with self._lock: + threshold = (current_slot - self._slot_window) & 0xFFFF + + expired_keys = [ + k for k in self._buffer + if self._is_older_slot(k[0], threshold, current_slot) + ] + + for key in expired_keys: + meta = self._buffer.pop(key) + if expired_callback: + expired_callback(meta) + + return len(expired_keys) + + def _is_newer_slot(self, slot_a: int, slot_b: int) -> bool: + diff = (slot_a - slot_b) & 0xFFFF + return diff < 0x8000 + + def _is_older_slot(self, slot: int, threshold: int, current: int) -> bool: + dist_to_current = (current - slot) & 0xFFFF + dist_to_threshold = (current - threshold) & 0xFFFF + return dist_to_current > dist_to_threshold + + @property + def size(self) -> int: + with self._lock: + return len(self._buffer) + + @property + def latest_slot(self) -> int: + return self._latest_slot + + +# ============================================================================= +# IQ Accumulator (Collects multiple IQ packets per slot) +# ============================================================================= + + +@dataclass +class SlotAccumulator: + """Accumulates IQ packets for a single slot""" + + slot_num: int + packets: List[np.ndarray] = field(default_factory=list) + first_packet_time: float = field(default_factory=time.time) + packet_count: int = 0 + + def add_packet(self, samples: np.ndarray) -> None: + """Add an IQ packet to this slot""" + self.packets.append(samples) + self.packet_count += 1 + + def get_combined_samples(self) -> np.ndarray: + """Combine all packets into single array""" + if not self.packets: + return np.array([], dtype=np.complex64) + return np.concatenate(self.packets) + + @property + def total_samples(self) -> int: + return sum(len(p) for p in self.packets) + + @property + def age(self) -> float: + """Time since first packet""" + return time.time() - self.first_packet_time + + +class IQAccumulatorBuffer: + """ + Accumulates IQ packets per slot. + + Emits complete slot when: + - New slot arrives (previous slot is complete) + - Timeout reached + """ + + def __init__( + self, + max_slots: int = 100, + slot_timeout: float = 0.1, # seconds + ): + self._buffer: OrderedDict[int, SlotAccumulator] = OrderedDict() + self._max_slots = max_slots + self._slot_timeout = slot_timeout + self._lock = threading.Lock() + self._latest_slot = -1 + + def add_packet(self, slot_num: int, samples: np.ndarray) -> List[SlotAccumulator]: + """ + Add an IQ packet. Returns list of completed slots to emit. + + A slot is considered complete when: + - A newer slot arrives (the older slot won't get more packets) + - The slot times out + """ + completed = [] + + with self._lock: + # Check for completed slots (older than current) + if self._latest_slot >= 0 and slot_num != self._latest_slot: + completed.extend(self._flush_older_slots(slot_num)) + + # Add packet to accumulator + if slot_num not in self._buffer: + self._buffer[slot_num] = SlotAccumulator(slot_num=slot_num) + + self._buffer[slot_num].add_packet(samples) + + # Update latest slot + if self._is_newer_slot(slot_num, self._latest_slot): + self._latest_slot = slot_num + + # Check for timed-out slots + completed.extend(self._flush_timed_out()) + + # Limit buffer size + while len(self._buffer) > self._max_slots: + oldest_slot = next(iter(self._buffer)) + completed.append(self._buffer.pop(oldest_slot)) + + return completed + + def _flush_older_slots(self, current_slot: int) -> List[SlotAccumulator]: + """Flush slots older than current""" + completed = [] + slots_to_remove = [] + + for slot_num, accumulator in self._buffer.items(): + if self._is_older_slot(slot_num, current_slot): + slots_to_remove.append(slot_num) + + for slot_num in slots_to_remove: + completed.append(self._buffer.pop(slot_num)) + + return completed + + def _flush_timed_out(self) -> List[SlotAccumulator]: + """Flush slots that have timed out""" + completed = [] + slots_to_remove = [] + + for slot_num, accumulator in self._buffer.items(): + if accumulator.age > self._slot_timeout: + slots_to_remove.append(slot_num) + + for slot_num in slots_to_remove: + completed.append(self._buffer.pop(slot_num)) + + return completed + + def flush_all(self) -> List[SlotAccumulator]: + """Flush all remaining slots""" + with self._lock: + completed = list(self._buffer.values()) + self._buffer.clear() + return completed + + def _is_newer_slot(self, slot_a: int, slot_b: int) -> bool: + if slot_b < 0: + return True + diff = (slot_a - slot_b) & 0xFFFF + return diff < 0x8000 + + def _is_older_slot(self, slot_a: int, slot_b: int) -> bool: + diff = (slot_b - slot_a) & 0xFFFF + return diff < 0x8000 and diff > 0 + + @property + def size(self) -> int: + with self._lock: + return len(self._buffer) + + +# ============================================================================= +# Main Interface Class +# ============================================================================= + + +class IQMetadataInterface: + """ + IQ-Prioritized interface with packet accumulation. + + Handles multiple IQ packets per slot by accumulating them. + Correlates with buffered metadata when slot is complete. + """ + + def __init__( + self, + iq_port: int = 5588, + metadata_port: int = 5589, + bind_address: str = "127.0.0.1", + metadata_buffer_size: int = 1000, + metadata_slot_window: int = 500, + iq_accumulator_size: int = 100, + iq_slot_timeout: float = 0.05, # 50ms timeout per slot + output_queue_size: int = 1000, + iq_socket_buffer: int = 8 * 1024 * 1024, + metadata_socket_buffer: int = 1024 * 1024, + cleanup_interval: int = 100, + ): + self.iq_port = iq_port + self.metadata_port = metadata_port + self.bind_address = bind_address + self._cleanup_interval = cleanup_interval + + # Metadata buffer (stores metadata until IQ slot completes) + self._metadata_buffer = MetadataBuffer( + max_slots=metadata_buffer_size, slot_window=metadata_slot_window + ) + + # IQ accumulator (collects multiple packets per slot) + self._iq_accumulator = IQAccumulatorBuffer( + max_slots=iq_accumulator_size, slot_timeout=iq_slot_timeout + ) + + # Output queue + self._output_queue: queue.Queue[CorrelatedData] = queue.Queue( + maxsize=output_queue_size + ) + + # Callbacks + self._correlated_callbacks: List[Callable[[CorrelatedData], None]] = [] + self._iq_callbacks: List[Callable[[IQSamples], None]] = [] + self._metadata_callbacks: List[Callable[[Metadata], None]] = [] + + # Thread control + self._running = False + self._threads: List[threading.Thread] = [] + self._stop_event = threading.Event() + + # Statistics + self.stats = InterfaceStats() + + # Socket configuration + self._iq_socket_buffer = iq_socket_buffer + self._metadata_socket_buffer = metadata_socket_buffer + + # Tracking + self._latest_iq_slot = 0 + # For tracking per-capture stats + self._capture_stats_start: Optional[InterfaceStats] = None + + logger.info( + f"IQMetadataInterface initialized (IQ:{iq_port}, Meta:{metadata_port})" + ) + logger.info(f" Metadata buffer: {metadata_buffer_size} slots") + logger.info(f" IQ accumulator: timeout={iq_slot_timeout*1000:.0f}ms") + + # ------------------------------------------------------------------------- + # Callback Registration + # ------------------------------------------------------------------------- + + def on_correlated_data( + self, callback: Callable[[CorrelatedData], None] + ) -> Callable: + self._correlated_callbacks.append(callback) + return callback + + def on_iq_data(self, callback: Callable[[IQSamples], None]) -> Callable: + self._iq_callbacks.append(callback) + return callback + + def on_metadata(self, callback: Callable[[Metadata], None]) -> Callable: + self._metadata_callbacks.append(callback) + return callback + + def remove_callback(self, callback: Callable) -> bool: + for cb_list in [ + self._correlated_callbacks, + self._iq_callbacks, + self._metadata_callbacks, + ]: + if callback in cb_list: + cb_list.remove(callback) + return True + return False + + # ------------------------------------------------------------------------- + # Queue-based Access + # ------------------------------------------------------------------------- + + def get( + self, block: bool = True, timeout: Optional[float] = None + ) -> Optional[CorrelatedData]: + try: + return self._output_queue.get(block=block, timeout=timeout) + except queue.Empty: + return None + + def get_nowait(self) -> Optional[CorrelatedData]: + return self.get(block=False) + + def get_all(self, max_items: int = 100) -> List[CorrelatedData]: + items = [] + while len(items) < max_items: + item = self.get_nowait() + if item is None: + break + items.append(item) + return items + + @property + def queue_size(self) -> int: + return self._output_queue.qsize() + + # ------------------------------------------------------------------------- + # Iterator Interface + # ------------------------------------------------------------------------- + + def __iter__(self) -> Iterator[CorrelatedData]: + return self + + def __next__(self) -> CorrelatedData: + if not self._running: + raise StopIteration + data = self.get(block=True, timeout=1.0) + if data is None: + if not self._running: + raise StopIteration + return self.__next__() + return data + + # ------------------------------------------------------------------------- + # Lifecycle Management + # ------------------------------------------------------------------------- + + def start(self) -> "IQMetadataInterface": + if self._running: + logger.warning("Interface already running") + return self + + self._running = True + self._stop_event.clear() + self.stats = InterfaceStats() + + self._threads = [ + threading.Thread( + target=self._receive_metadata, name="MetadataRx", daemon=True + ), + threading.Thread(target=self._receive_iq, name="IQRx", daemon=True), + ] + + for t in self._threads: + t.start() + logger.info(f"Started thread: {t.name}") + + logger.info("IQMetadataInterface started (IQ-prioritized with accumulation)") + return self + + def stop(self) -> None: + if not self._running: + return + + self._running = False + self._stop_event.set() + + # Flush remaining accumulated IQ + remaining = self._iq_accumulator.flush_all() + for slot_acc in remaining: + self._process_completed_slot(slot_acc) + + for t in self._threads: + t.join(timeout=2.0) + + self._threads.clear() + logger.info("IQMetadataInterface stopped") + + def __enter__(self) -> "IQMetadataInterface": + return self.start() + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.stop() + + @property + def is_running(self) -> bool: + return self._running + + # ------------------------------------------------------------------------- + # Internal: Packet Parsing + # ------------------------------------------------------------------------- + + def _parse_metadata_packet(self, data: bytes) -> Optional[Metadata]: + # Packet layout: [slot_num 2B][dl_flag 1B][rb_bitmap NB] + if len(data) < 3: + return None + slot_num = struct.unpack(">H", data[0:2])[0] + dl_flag = bool(data[2]) + rb_bitmap = data[3:] + return Metadata(slot_num=slot_num, rb_bitmap=rb_bitmap, dl_flag=dl_flag) + + def _parse_iq_packet(self, data: bytes) -> Optional[tuple]: + """Returns (slot_num, samples) tuple""" + if len(data) < 10: + return None + slot_num = struct.unpack(">H", data[0:2])[0] + samples = np.frombuffer(data[2:], dtype=np.float32).view(np.complex64) + return (slot_num, samples) + + # ------------------------------------------------------------------------- + # Internal: Slot Processing + # ------------------------------------------------------------------------- + + def _process_completed_slot(self, slot_acc: SlotAccumulator) -> None: + """Process a completed slot (all IQ packets received)""" + slot_num = slot_acc.slot_num + + # Remove ALL metadata entries for this slot (1 for normal slots, + # 2 for flexible slots that carry both DL and UL bitmaps). + metadatas = self._metadata_buffer.remove_all(slot_num) + + if metadatas: + self.stats.metadata_buffer_hits += 1 + self.stats.iq_packets_correlated += slot_acc.packet_count + + combined_samples = slot_acc.get_combined_samples() + iq = IQSamples(slot_num=slot_num, samples=combined_samples) + + for metadata in metadatas: + self.stats.slots_correlated += 1 + correlated = CorrelatedData(slot_num=slot_num, metadata=metadata, iq=iq) + self._emit_correlated(correlated) + else: + # No metadata found + self.stats.metadata_buffer_misses += 1 + self.stats.slots_without_metadata += 1 + logger.debug( + f"Slot {slot_num}: no metadata found ({slot_acc.packet_count} IQ packets)" + ) + + def _emit_correlated(self, data: CorrelatedData) -> None: + """Emit correlated data to callbacks and queue""" + try: + self._output_queue.put_nowait(data) + except queue.Full: + try: + self._output_queue.get_nowait() + self._output_queue.put_nowait(data) + except queue.Empty: + pass + + for callback in self._correlated_callbacks: + try: + callback(data) + self.stats.callbacks_invoked += 1 + except Exception as e: + self.stats.callback_errors += 1 + logger.error(f"Callback error: {e}") + + def _on_metadata_expired(self, metadata: Metadata) -> None: + self.stats.metadata_expired += 1 + + # ------------------------------------------------------------------------- + # Internal: Receive Threads + # ------------------------------------------------------------------------- + + def _flush_socket(self, sock: socket.socket) -> int: + sock.setblocking(False) + count = 0 + try: + while True: + sock.recv(65535) + count += 1 + except BlockingIOError: + pass + sock.setblocking(True) + sock.settimeout(1.0) + return count + + def _receive_metadata(self) -> None: + """Thread: receive metadata and store in buffer""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt( + socket.SOL_SOCKET, socket.SO_RCVBUF, self._metadata_socket_buffer + ) + sock.bind((self.bind_address, self.metadata_port)) + + flushed = self._flush_socket(sock) + if flushed > 0: + logger.debug(f"Flushed {flushed} old metadata packets") + + while not self._stop_event.is_set(): + try: + data, _ = sock.recvfrom(65535) + if len(data) == 0: + continue + + metadata = self._parse_metadata_packet(data) + if metadata is None: + continue + + self.stats.metadata_received += 1 + self.stats.metadata_bytes += len(data) + + for callback in self._metadata_callbacks: + try: + callback(metadata) + except Exception as e: + logger.error(f"Metadata callback error: {e}") + + # Store in buffer + self._metadata_buffer.store(metadata) + + except socket.timeout: + continue + except Exception as e: + if self._running: + logger.error(f"Metadata receive error: {e}") + + sock.close() + + def _receive_iq(self) -> None: + """Thread: receive IQ packets, accumulate, correlate when slot complete""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._iq_socket_buffer) + sock.bind((self.bind_address, self.iq_port)) + + flushed = self._flush_socket(sock) + if flushed > 0: + logger.debug(f"Flushed {flushed} old IQ packets") + + packet_count = 0 + + while not self._stop_event.is_set(): + try: + data, _ = sock.recvfrom(65535) + if len(data) == 0: + continue + + parsed = self._parse_iq_packet(data) + if parsed is None: + continue + + slot_num, samples = parsed + + # Update stats + self.stats.iq_packets_received += 1 + self.stats.iq_samples_received += len(samples) + self.stats.iq_bytes += len(data) + self._latest_iq_slot = slot_num + packet_count += 1 + + # Raw IQ callback (per packet) + for callback in self._iq_callbacks: + try: + iq = IQSamples(slot_num=slot_num, samples=samples) + callback(iq) + except Exception as e: + logger.error(f"IQ callback error: {e}") + + # Add to accumulator - returns completed slots + completed_slots = self._iq_accumulator.add_packet(slot_num, samples) + + # Process any completed slots + for slot_acc in completed_slots: + self._process_completed_slot(slot_acc) + + # Periodic cleanup + if packet_count % self._cleanup_interval == 0: + _ = self._metadata_buffer.cleanup_old( + slot_num, expired_callback=self._on_metadata_expired + ) + + except socket.timeout: + continue + except Exception as e: + if self._running: + logger.error(f"IQ receive error: {e}") + + sock.close() + + # ------------------------------------------------------------------------- + # Utility Methods + # ------------------------------------------------------------------------- + + def reset_stats(self) -> None: + """Reset all statistics to zero""" + self.stats.reset() + self._capture_stats_start = None + logger.info("Statistics reset") + + def get_stats(self) -> InterfaceStats: + """Get current aggregated statistics""" + return self.stats + + def start_stats_capture(self) -> None: + """Mark the start of a stats capture period""" + self._capture_stats_start = self.stats.snapshot() + + def get_capture_stats(self) -> Dict[str, Any]: + """ + Get statistics for current capture period only. + + Returns stats since last start_stats_capture() call. + """ + if self._capture_stats_start is None: + # No capture started, return current totals + return self.stats.to_dict() + + start = self._capture_stats_start + current = self.stats + + # Calculate deltas + runtime = current.runtime - (start.start_time - current.start_time) + # capture_start = time.time() - runtime + + metadata_received = current.metadata_received - start.metadata_received + iq_packets = current.iq_packets_received - start.iq_packets_received + iq_samples = current.iq_samples_received - start.iq_samples_received + iq_bytes = current.iq_bytes - start.iq_bytes + slots_correlated = current.slots_correlated - start.slots_correlated + iq_packets_correlated = ( + current.iq_packets_correlated - start.iq_packets_correlated + ) + slots_without_meta = ( + current.slots_without_metadata - start.slots_without_metadata + ) + metadata_expired = current.metadata_expired - start.metadata_expired + + # Calculate rates + duration = time.time() - ( + self._capture_stats_start.start_time + + (time.time() - current.start_time - current.runtime) + ) + duration = max(duration, 0.001) + + # Simpler duration calculation + duration = runtime if runtime > 0 else 0.001 + + total_slots = slots_correlated + slots_without_meta + correlation_rate = ( + (slots_correlated / total_slots * 100) if total_slots > 0 else 0.0 + ) + avg_packets = ( + (iq_packets_correlated / slots_correlated) if slots_correlated > 0 else 0.0 + ) + + return { + "capture_duration": duration, + "metadata_received": metadata_received, + "metadata_rate": metadata_received / duration, + "iq_packets_received": iq_packets, + "iq_packet_rate": iq_packets / duration, + "iq_samples_received": iq_samples, + "iq_bytes": iq_bytes, + "iq_bandwidth_mbps": (iq_bytes / duration) / 1e6, + "slots_correlated": slots_correlated, + "slot_rate": slots_correlated / duration, + "correlation_rate_pct": correlation_rate, + "avg_packets_per_slot": avg_packets, + "slots_without_metadata": slots_without_meta, + "metadata_expired": metadata_expired, + } + + def print_stats(self, capture_only: bool = False) -> None: + """ + Print statistics. + + Args: + capture_only: If True, print only current capture stats + """ + if capture_only and self._capture_stats_start is not None: + stats = self.get_capture_stats() + print("\n=== Capture Stats ===") + else: + stats = self.stats.to_dict() + print("\n=== Aggregated Stats ===") + + print( + f"Duration: {stats.get('capture_duration', stats.get('runtime_s', 0)):.1f}s" + ) + # print(f"Metadata received: {stats['metadata_received']}") + # print(f"IQ packets received: {stats['iq_packets_received']}") + # print(f"Slots correlated: {stats['slots_correlated']}") + print(f"Correlation rate: {stats['correlation_rate_pct']:.1f}%") + # print(f"Avg packets/slot: {stats['avg_packets_per_slot']:.1f}") + print(f"Slots without meta: {stats['slots_without_metadata']}") + # print(f"Metadata expired: {stats['metadata_expired']}") + + if not capture_only: + print(f"Metadata buffer: {self._metadata_buffer.size} slots") + print(f"IQ accumulator: {self._iq_accumulator.size} slots pending") + + @property + def metadata_buffer_size(self) -> int: + return self._metadata_buffer.size + + @property + def latest_iq_slot(self) -> int: + return self._latest_iq_slot + + @property + def latest_metadata_slot(self) -> int: + return self._metadata_buffer.latest_slot + + +# ============================================================================= +# Convenience Functions +# ============================================================================= + + +def create_interface( + iq_port: int = 5588, metadata_port: int = 5589, **kwargs +) -> IQMetadataInterface: + return IQMetadataInterface(iq_port=iq_port, metadata_port=metadata_port, **kwargs) + + +# ============================================================================= +# Main +# ============================================================================= + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="IQ Metadata Interface Test") + parser.add_argument("--iq-port", type=int, default=5588) + parser.add_argument("--metadata-port", type=int, default=5589) + parser.add_argument("--stats-interval", type=float, default=2.0) + args = parser.parse_args() + + print("=" * 60) + print(" IQ Metadata Interface (with Accumulation)") + print("=" * 60) + + interface = IQMetadataInterface( + iq_port=args.iq_port, metadata_port=args.metadata_port + ) + + @interface.on_correlated_data + def handle_data(data: CorrelatedData): + print( + f"[SLOT {data.slot_num:5d}] " + f"IQ: {data.iq.num_samples:6d} samples | " + f"RBs: {data.metadata.num_allocated_rbs:3d} | " + f"Power: {data.iq.power_db:6.1f} dB" + ) + + interface.start() + + try: + while True: + time.sleep(args.stats_interval) + print() + interface.print_stats() + except KeyboardInterrupt: + print("\nShutting down...") + interface.stop() diff --git a/gain_viz/plot.png b/gain_viz/plot.png new file mode 100644 index 0000000..64ed3b3 Binary files /dev/null and b/gain_viz/plot.png differ diff --git a/gain_viz/templates/ind.html b/gain_viz/templates/ind.html new file mode 100644 index 0000000..73f3cc2 --- /dev/null +++ b/gain_viz/templates/ind.html @@ -0,0 +1,954 @@ + + + + + + Gain-Viz β€” Spectrum Viewer + + + + +
+
+

Gain-Viz

+

Interactive gain control and real-time RF visualization

+
+ +
+
+ +
+

Stream Controls

+ +
+ + + +
+ +
+ ● Stopped +
+
+ + +
+

Gain Settings

+ +
+
+ +
+ +
+ +
+ +
+ + +
+ +
+ +
+ +
+ + +
+ +
+ +
+ +
+ + +
+ +
+ +
+ +
+
+ +
+ + +
+ +
+
+
+ + +
+

Spectrogram Parameters

+ +
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ +
+ +
+
+
+
+
+ + +
+
+
+

Spectrum Analysis

+
+ +
+
+ +
+ Spectrum Analysis plot +
+
+
+
+ +
+ Gain-Viz β€’ Real-time RF monitoring +
+
+ + + + \ No newline at end of file diff --git a/gain_viz/templates/index.html b/gain_viz/templates/index.html index 73f3cc2..83b7a13 100644 --- a/gain_viz/templates/index.html +++ b/gain_viz/templates/index.html @@ -3,633 +3,469 @@ - Gain-Viz β€” Spectrum Viewer + Gain-Viz | IQ Spectrum Viewer + -
-
-

Gain-Viz

-

Interactive gain control and real-time RF visualization

+
+
+
+

GAIN-VIZ

+

UDP IQ stream monitoring and spectrum visualization

+
+
+ + Stopped +
-
-
- -
-

Stream Controls

- +
+
- - -
-

Gain Settings

+
+ +
+
Gain Settings
-
- +
-
- -
-
- -
-
- -
-
- -
-
- -
-
- -
-
- -
-
-
- - +
+ +
-
+
-
- - -
-

Spectrogram Parameters

+
+
+
Acquisition Parameters
@@ -649,305 +485,249 @@
- - + +
-
- +
+
-
- -
-
- -
-
-
-

Spectrum Analysis

-
- -
-
- -
- Spectrum Analysis plot +
+ + + + +
+
+
+

IQ Spectrum Display

+

Time-domain and spectrogram view from UDP IQ payload

+
-
+ +
+ IQ spectrum plot +
+
-
- Gain-Viz β€’ Real-time RF monitoring +
+ Gain-Viz | Real-time IQ spectrum viewer
diff --git a/plot.png b/plot.png new file mode 100644 index 0000000..ff97176 Binary files /dev/null and b/plot.png differ diff --git a/pyproject.toml b/pyproject.toml index 17bdd12..805045a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ dependencies = [ "matplotlib", "numpy", "pyzmq", - "pyserial" + "pyserial", + "flask_socketio" ] [project.scripts] diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..65b6e18 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,494 @@ + + + + IQ Spectrum Analyzer + + + + +
+

πŸ“‘ IQ Spectrum Analyzer

+ + +
+ + + +
+
+ ● Stopped +
+ + +
+
+
--
+
Current Slot
+
+
+
--
+
Direction
+
+
+
--
+
Power (dB)
+
+
+
0
+
Allocated RBs
+
+
+
0
+
IQ Packets
+
+
+
0
+
Slots Correlated
+
+
+
0%
+
Correlation
+
+
+
0
+
Slots/s
+
+
+
0
+
IQ BW (Mbps)
+
+
+ + +
+ IQ Spectrum Plot +
+ + +
+

βš™ Spectrogram Parameters

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+ +
+
+ + +
+

🎚 Gain Settings

+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+ + +
+
+ + +
+ + + + \ No newline at end of file