import os import re import subprocess import threading import time from functions.at_commands import ( get_modem_cops, get_modem_cpol, get_modem_creg, get_modem_csq, get_modem_qnwcfg, get_modem_qnwinfo, get_modem_qspn, get_modem_rsrp, get_modem_rsrq, get_modem_sinr, set_configs, ) from functions.helper_functions import save_data_to_json from functions.location import get_current_location, set_base_location from functions.sierra_commands import get_modem_nr_info, get_modem_status from iperf.iperf_functions import collect_iperf class Communication: def __init__(self): self.running = False self.base_location = None set_modem = ( input( "Do you want limit the modem to NR5G without roaming? (Not recommended if PLMN is not 001) (y/n )" ) .strip() .lower() ) while set_modem not in ("y", "yes", "n", "no"): set_modem = input("Your response must be y or n ").strip().lower() if set_modem in ("y", "yes"): print("Setting configs...") set_configs() self.ip_address = ( input("Enter the ip address (Default is 10.45.0.1): ").strip().lower() or "10.45.0.1" ) self.folder_name = ( input("Enter the log folder name (Default is oct_range_test): ") .strip() .lower() or "oct_range_test" ) self.interval = int( input( "Enter the time interval (s) in between pings/modem data collection (Default is 5): " ) .strip() .lower() or 5 ) self.iperf_duration = int( input("Enter the iperf test duration (s) (Default is 5): ").strip().lower() or 5 ) foldername = f"data/{self.folder_name}" self.filename = foldername + "/test_" + str(int(time.time())) + ".json" os.makedirs(foldername, exist_ok=True) def set_location(self): self.base_location = set_base_location() save_data_to_json(data=self.base_location, filename=self.filename) def run_iperf(self): collect_iperf( filename=self.filename, is_client=True, server_ip=self.ip_address, duration=self.iperf_duration, timeout=(self.iperf_duration + 5), base_location=self.base_location, ) # Collect and send data continuously from Quectel def collect_data(self): while self.running: data = {} data = get_current_location(dictionary=data) data = get_modem_cops(dictionary=data) data = get_modem_creg(dictionary=data) data = get_modem_csq(dictionary=data) data = get_modem_rsrp(dictionary=data) data = get_modem_rsrq(dictionary=data) data = get_modem_sinr(dictionary=data) data = get_modem_cpol(dictionary=data) data = get_modem_qnwcfg(dictionary=data) data = get_modem_qnwinfo(dictionary=data) data = get_modem_qspn(dictionary=data) data = ping_basestation(dictionary=data, address=self.ip_address) data["timestamp"] = time.time() # Send to server print(f"\nDistance: {data.get('distance')}") print(f"Service: {data.get('network information')}") print(f"Ping Time: {data.get(f'{self.ip_address}_ping_time')}") print( f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} " f"{data.get('RSRP RX2')} {data.get('RSRP RX3')}" ) save_data_to_json(data=data, filename=self.filename) time.sleep(self.interval) # Collect and send data continuously from Sierra def collect_sierra_data(self): while self.running: data = {} data = get_current_location(dictionary=data) data = get_modem_nr_info(dictionary=data) data = get_modem_status(dictionary=data) data = ping_basestation(dictionary=data, address=self.ip_address) data["timestamp"] = time.time() # Send to server print(f"\nDistance: {data.get('distance')}") print(f"Ping Time: {data.get('ping_time')}") print(f"RSRP: {data.get('RSRP')}") save_data_to_json(data=data, filename=self.filename) time.sleep(self.interval) # Ping base station def ping_basestation(address="10.45.0.1", name="10.45.0.1", dictionary={}) -> dict: try: result = subprocess.run( ["ping", "-c", "1", address], capture_output=True, text=True ) match = re.search(r"time=([\d.]+)", result.stdout) if match: time_value = match.group(1) dictionary[f"{name}_ping_time"] = time_value else: dictionary[f"{name}_ping_time"] = "Not found" dictionary[f"{name}_ping_stats"] = result.stdout except Exception as e: dictionary[f"{name}_ping error"] = f"{e}" return dictionary if __name__ == "__main__": modem = Communication() print( "Type 'l' to set basestation location, 'b' to begin, " "'s' to stop, 'i' to run an iperf test, or 'x' to exit:" ) while True: command = input("> ").strip().lower() if command == "b" and not modem.running: print("Starting data collection...") modem.running = True threading.Thread(target=modem.collect_data).start() elif command == "l": modem.set_location() elif command == "i": threading.Thread(target=modem.run_iperf).start() elif command == "s" and modem.running: print("Stopping data collection...") modem.running = False elif command == "x": modem.running = False break elif command not in ["l", "b", "s", "i", "x"]: print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.")