range-testing/relay_to_ground_iperf.py

56 lines
1.7 KiB
Python
Raw Permalink Normal View History

2025-09-22 10:54:39 -04:00
import socket
import subprocess
import time
GROUND_IP = "10.46.0.1" # gNodeB IP of ground station
PORT = 5005 # port to listen on for commands
BUFFER_SIZE = 4096
def run_iperf(normal=True):
"""Run iperf3 to ground. normal=True -> normal test, False -> reverse test."""
cmd = ["iperf3", "-c", GROUND_IP, "-t", "5"]
if not normal:
cmd.append("-R") # reverse mode
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return result.stdout + result.stderr
except subprocess.TimeoutExpired:
return "[Relay] iperf test timed out\n"
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", PORT))
print(f"[Relay] Listening for commands on port {PORT}")
while True:
data, addr = sock.recvfrom(BUFFER_SIZE)
command = data.decode().strip()
print(f"[Relay] Received command '{command}' from {addr}")
if command.lower() == "start":
# Run normal test
output_normal = run_iperf(normal=True)
time.sleep(0.5)
# Run reverse test
output_reverse = run_iperf(normal=False)
# Combine results
combined = (
"\n=== Normal Test (Relay → Ground) ===\n"
+ output_normal
+ "\n=== Reverse Test (Ground → Relay) ===\n"
+ output_reverse
)
# Send results back in chunks
for i in range(0, len(combined), BUFFER_SIZE):
sock.sendto(combined[i : i + BUFFER_SIZE].encode(), addr)
print(f"[Relay] Sent results back to {addr}")
if __name__ == "__main__":
main()