gain-viz/gain_viz/app.py

274 lines
8.4 KiB
Python
Raw Normal View History

G
2025-09-23 11:07:54 -04:00
from flask import Flask, render_template, send_file, request, jsonify
import zmq
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import os
import threading
import time
import serial
app = Flask(__name__)
# Path to save the plot image
G
2025-09-25 10:29:17 -04:00
#PLOT_PATH = os.path.join(os.getcwd(), "plot.png")
G
2025-09-25 10:34:55 -04:00
PLOT_PATH = "/opt/gain_viz/plot.png"
G
2025-09-23 11:07:54 -04:00
# Global variables for gain values
usrp_tx_gain = 60
usrp_rx_gain = 30
scm_tx_gain = 30
scm_rx_gain = 30
G
2025-09-25 11:28:00 -04:00
# Global variables for plot settings
sample_rate = 23.04e6 # Hz
window_ms = 20
center_freq = 3.415e9
NFFT = 1024
tcp_port = 5556
G
2025-09-23 11:07:54 -04:00
# ----------------- Serial / SCM -----------------
def connect_serial(port, baudrate=115200, timeout=1):
"""Connect to a serial port with even parity."""
try:
ser = serial.Serial(
port=port,
baudrate=baudrate,
timeout=timeout,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE
)
return ser
except serial.SerialException as e:
print(f"Error connecting to {port}: {e}")
return None
def send_command(ser, command):
if ser.is_open:
ser.write(command.encode('utf-8'))
def receive_feedback(ser):
if ser.is_open:
try:
ser.flush()
raw_response = ser.readlines()
if raw_response:
rep = ""
for x in raw_response:
rep += str(x) + " ,"
rep = rep[2:].split("\\r")
return rep[-2]
except serial.SerialTimeoutException:
return ""
return ""
def scm_conf(port, baudrate, rx_cmd, tx_cmd):
ser = connect_serial(port, baudrate)
commands = [rx_cmd, tx_cmd]
if ser:
for cmd in commands:
feedback = None
attempt = 0
while feedback != "OK" and attempt < 5:
send_command(ser, cmd + "\r")
feedback = receive_feedback(ser)
attempt += 1
return True
return False
# ----------------- Gain Updates -----------------
def gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx):
global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain
scm_change = False
if usrp_tx != usrp_tx_gain:
usrp_tx_gain = usrp_tx
os.system(f"tmux send-keys -t ran 'tx_gain 0 {usrp_tx_gain} ' C-m")
if usrp_rx != usrp_rx_gain:
usrp_rx_gain = usrp_rx
os.system(f"tmux send-keys -t ran 'rx_gain 0 {usrp_rx_gain} ' C-m")
if scm_tx != scm_tx_gain:
scm_tx_gain = scm_tx
scm_change = True
if scm_rx != scm_rx_gain:
scm_rx_gain = scm_rx
scm_change = True
t_cmd = f"HW:GAIN 0 TX 0 {scm_tx_gain}"
r_cmd = f"HW:GAIN 1 RX 0 {scm_rx_gain}"
if scm_change:
scm_conf("/dev/ttyUSB0", 115200, r_cmd, t_cmd)
scm_conf("/dev/ttyUSB1", 115200, r_cmd, t_cmd)
return True
# ----------------- ZMQ Subscriber -----------------
def zmq_subscriber(host, port):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.connect(f"tcp://{host}:{port}")
return socket
# ----------------- Plot Generation -----------------
def generate_spectrum_plot():
G
2025-09-25 11:28:00 -04:00
socket = zmq_subscriber("localhost", tcp_port)
global sample_rate, window_ms, center_freq, NFFT
G
2025-09-23 11:07:54 -04:00
window_samples = int(sample_rate * window_ms / 1000)
noverlap = 512
cmap = plt.get_cmap('twilight')
# Initial placeholder for first plot (zeros)
iq_sample = np.zeros(window_samples, dtype=np.complex64)
while True:
try:
# Try to read ZMQ message
msg = socket.recv(zmq.NOBLOCK)
float_data = np.frombuffer(msg, dtype=np.float32)
if float_data.size >= 2:
complex_data = float_data.reshape(-1, 2)
iq_all = complex_data[:, 0] + 1j * complex_data[:, 1]
iq_sample = (
iq_all[-window_samples:]
if len(iq_all) >= window_samples
else np.pad(iq_all, (window_samples - len(iq_all), 0))
)
# --- Create plot ---
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6))
fig.subplots_adjust(hspace=0.4)
# Time-domain plot (ms)
times_ms = np.arange(len(iq_sample)) * 1000 / sample_rate
ax1.plot(times_ms, np.real(iq_sample), label="Real", color='b')
ax1.plot(times_ms, np.imag(iq_sample), label="Imag", color='r')
ax1.set_xlim(0, window_ms)
ax1.set_xlabel("Time (ms)")
ax1.set_ylabel("IQ Amplitude")
ax1.grid(True, which='both', linestyle='--', linewidth=0.5)
ax1.legend()
# Spectrogram without grid
ax2.specgram(
iq_sample,
Fs=sample_rate,
Fc=center_freq,
NFFT=NFFT,
noverlap=noverlap,
cmap=cmap
)
ax2.set_xlabel("Time (ms)")
ax2.set_ylabel("Frequency (Hz)")
ax2.grid(False)
ax2.set_ylim(center_freq - sample_rate / 2,
center_freq + sample_rate / 2)
ax2.xaxis.set_major_formatter(ticker.FuncFormatter(lambda t, pos: '{0:g}'.format(t*1e3))) # kHz format
ax2.set_xlabel("Time (ms)")
ax2.set_ylabel("Frequency (Hz)")
ax2.xaxis.set_minor_locator(ticker.AutoMinorLocator())
# Save the plot
G
2025-09-23 16:06:33 -04:00
os.makedirs(os.path.dirname(PLOT_PATH), exist_ok=True)
G
2025-09-23 11:07:54 -04:00
plt.savefig(PLOT_PATH, bbox_inches='tight')
plt.close(fig)
except zmq.Again:
pass # No new data, keep last iq_sample
# Fast refresh (20ms = 50 fps)
time.sleep(0.5)
# ----------------- Flask Routes -----------------
@app.route('/')
def index():
return render_template('index.html')
@app.route('/update_gains', methods=['POST'])
def update_gains():
global usrp_tx_gain, usrp_rx_gain, scm_tx_gain, scm_rx_gain
usrp_tx = request.form.get('usrp_tx_gain', usrp_tx_gain, type=float)
usrp_rx = request.form.get('usrp_rx_gain', usrp_rx_gain, type=float)
scm_tx = request.form.get('scm_tx_gain', scm_tx_gain, type=float)
scm_rx = request.form.get('scm_rx_gain', scm_rx_gain, type=float)
gain_update(usrp_tx, usrp_rx, scm_tx, scm_rx)
return jsonify({"status": "success", "message": "Gains updated successfully"})
@app.route('/plot')
def plot():
return send_file(PLOT_PATH, mimetype='image/png')
@app.route('/get_gains')
def get_gains():
return jsonify({
"usrp_tx_gain": usrp_tx_gain,
"usrp_rx_gain": usrp_rx_gain,
"scm_tx_gain": scm_tx_gain,
"scm_rx_gain": scm_rx_gain
})
G
2025-09-25 11:28:00 -04:00
@app.route('/update_params', methods=['POST'])
def update_params():
G
2025-09-25 12:54:25 -04:00
global sample_rate, window_ms, center_freq, NFFT, tcp_port
G
2025-09-25 11:28:00 -04:00
try:
# Get parameters from form data
center_freq = request.form.get('center_freq', type=float)
sample_rate = request.form.get('sample_rate', type=float)
NFFT = request.form.get('fft_size', type=int)
window_ms = request.form.get('window_ms', type=float)
G
2025-09-25 12:54:25 -04:00
tcp_port = request.form.get('tcp_port', type=int)
G
2025-09-25 11:28:00 -04:00
# Save to config file if needed
save_config()
return jsonify({
'status': 'success',
'message': 'Parameters updated successfully'
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
# Add to your config handling
def save_config():
config = {
'center_freq': center_freq,
'sample_rate': sample_rate,
'fft_size': NFFT,
G
2025-09-25 13:00:09 -04:00
'window_ms': window_ms,
G
2025-09-25 12:54:25 -04:00
'tcp_port' : tcp_port
G
2025-09-25 11:28:00 -04:00
}
with open('/opt/gain-viz/config.json', 'w') as f:
json.dump(config, f)
G
2025-09-23 11:07:54 -04:00
# ----------------- Main -----------------
G
2025-09-23 15:30:12 -04:00
def main():
G
2025-09-23 11:07:54 -04:00
# Ensure placeholder image exists
if not os.path.exists(PLOT_PATH):
plt.figure()
plt.text(0.5, 0.5, "Waiting for data...", ha='center', va='center')
plt.savefig(PLOT_PATH)
plt.close()
# Start plotting thread
threading.Thread(target=generate_spectrum_plot, daemon=True).start()
app.run(host="0.0.0.0", debug=True)