import os import re import subprocess import threading import time from at_commands import ( get_modem_cops, get_modem_cpol, get_modem_creg, get_modem_csq, get_modem_qnwcfg, get_modem_qnwinfo, get_modem_qspn, get_modem_rsrp, get_modem_rsrq, get_modem_sinr, set_configs, ) from communication import ( collect_iperf, get_current_location, ping_basestation, set_base_location, ) from end_to_relay_client import collect_iperf_remote from helper_functions import save_data_to_json # Globals running = False # To control start/stop # Ping base station def ping_basestation(address="10.45.0.1", name="10.45.0.1", dictionary={}) -> dict: try: result = subprocess.run( ["ping", "-c", "1", address], capture_output=True, text=True ) match = re.search(r"time=([\d.]+)", result.stdout) if match: time_value = match.group(1) dictionary[f"{name}_ping_time"] = time_value else: dictionary[f"{name}_ping_time"] = "Not found" dictionary[f"{name}_ping_stats"] = result.stdout except Exception as e: dictionary[f"{name}_ping error"] = f"{e}" return dictionary def calculate_ping( dictionary: dict, base_address: str, relay_address: str, name: str ) -> dict: try: base_ping = float(dictionary[f"{base_address}_ping_time"]) relay_ping = float(dictionary[f"{relay_address}_ping_time"]) calculated_ping = base_ping - relay_ping dictionary[f"{name}_c_ping_time"] = calculated_ping except Exception as e: dictionary[f"{name}_c_ping error"] = f"{e}" return dictionary def relay_iperf( filename: str, base_address: str, relay_address: str, duration: str ) -> None: collect_iperf( filename=filename, server_ip=base_address, duration=duration, stations="eg" ) collect_iperf( filename=filename, server_ip=relay_address, duration=duration, stations="er" ) try: collect_iperf_remote(filename=filename) except Exception as e: print(f"Error collecting relay -> ground iperf: {e}") # Collect and send data continuously def collect_data( filename: str, base_address: str, relay_address: str, interval: int ) -> None: global running while running: data = {} data = get_current_location(dictionary=data) data = get_modem_cops(dictionary=data) data = get_modem_creg(dictionary=data) data = get_modem_csq(dictionary=data) data = get_modem_rsrp(dictionary=data) data = get_modem_rsrq(dictionary=data) data = get_modem_sinr(dictionary=data) data = get_modem_cpol(dictionary=data) data = get_modem_qnwcfg(dictionary=data) data = get_modem_qnwinfo(dictionary=data) data = get_modem_qspn(dictionary=data) data = ping_basestation( dictionary=data, name="eg", address=base_address ) # edge to ground data = ping_basestation( dictionary=data, name="er", address=relay_address ) # edge to relay data = calculate_ping( dictionary=data, base_address=base_address, relay_address=relay_address, name="rg", ) # relay to ground data["timestamp"] = time.time() # Send to server print(f"\nPing Time: {data.get('ping_time')}") print( f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} {data.get('RSRP RX2')} {data.get('RSRP RX3')}" ) print(f"Service: {data.get('network information')}") save_data_to_json(data=data, filename=filename) time.sleep(interval) def set_parameters(): set_modem = ( input( "Do you want limit the modem to NR5G without roaming? (Not recommended if PLMN is not 001) (y/n )" ) .strip() .lower() ) while set_modem not in ("y", "yes", "n", "no"): set_modem = input("Your response must be y or n ").strip().lower() if set_modem in ("y", "yes"): print("Setting configs...") set_configs() base_address = ( input("Enter the base station (ground) ip address (Default is 10.46.0.1): ") .strip() .lower() or "10.46.0.1" ) relay_address = ( input("Enter the relay station ip address (Default is 10.45.0.1): ") .strip() .lower() or "10.45.0.1" ) folder_name = ( input("Enter the log folder name (Default is boat_relay_sept): ") .strip() .lower() or "boat_relay_sept" ) interval = int( input( "Enter the time interval (s) in between pings/modem data collection (Default is 5): " ) .strip() .lower() or 5 ) iperf_duration = int( input("Enter the iperf test duration (s) (Default is 5): ").strip().lower() or 5 ) return base_address, relay_address, folder_name, interval, iperf_duration if __name__ == "__main__": base_address, relay_address, folder_name, interval, iperf_duration = ( set_parameters() ) foldername = f"data/{folder_name}" filename = foldername + "/test_" + str(int(time.time())) + ".json" os.makedirs(foldername, exist_ok=True) print( "Type 'l' to set basestation location, 'b' to begin, " "'s' to stop, 'i' to run an iperf test, or 'x' to exit:" ) while True: command = input("> ").strip().lower() if command == "b" and not running: print("Starting data collection...") running = True threading.Thread( target=collect_data, args=(filename, base_address, relay_address, interval), ).start() elif command == "l": base_location_data = set_base_location() save_data_to_json(data=base_location_data, filename=filename) elif command == "i": threading.Thread( target=relay_iperf, args=(filename, base_address, relay_address, iperf_duration), ).start() elif command == "s" and running: print("Stopping data collection...") running = False elif command == "x": running = False break elif command not in ["l", "b", "s", "i", "x"]: print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.")