import os import pprint import re import subprocess import threading import time import serial from at_commands import (get_modem_cops, get_modem_cpol, get_modem_creg, get_modem_csq, get_modem_qnwcfg, get_modem_qnwinfo, get_modem_qspn, get_modem_rsrp, get_modem_rsrq, get_modem_sinr, set_configs) from helper_functions import (calculate_distance, parse_lat_lon, save_data_to_json) from sierra_commands import get_modem_nr_info, get_modem_status # Globals running = False # To control start/stop base_location = None # The basestation location def read_gps_data( port: str = "/dev/ttyACM0", baudrate: int = 9600, timeout: int = 5 ) -> dict: """ Reads GPS data from a u-blox 7 GPS/GLONASS device and returns it as a dictionary. Args: port (str): The serial port the GPS device is connected to. Default is '/dev/ttyACM0'. baudrate (int): Baud rate for serial communication. Default is 9600. timeout (int): Timeout in seconds for the serial port. Default is 5 seconds. Returns: dict: A dictionary containing parsed GPS data. """ gps_data = {} attempts = 0 try: with serial.Serial(port, baudrate=baudrate, timeout=timeout) as ser: while True: # Read a line from the GPS device line = ser.readline().decode("ascii", errors="ignore").strip() # Filter for GGA or RMC sentences for relevant data if line.startswith("$GPGGA"): # Global Positioning System Fix Data parts = line.split(",") gps_data["utc_time"] = parts[1] gps_data["latitude"] = parse_lat_lon(parts[2], parts[3]) gps_data["longitude"] = parse_lat_lon(parts[4], parts[5]) gps_data["altitude"] = float(parts[9]) if parts[9] else None # Count number of times GPGGA has been queried attempts = attempts + 1 # Stop after collecting sufficient data if ( gps_data.get("utc_time") and gps_data.get("latitude") and gps_data.get("longitude") ): break elif attempts >= 3: gps_data = {} break except serial.SerialException as e: print(f"Serial communication error: {e}") except Exception as e: print(f"Error: {e}") return gps_data def set_base_location(): global base_location base_location = {} try: gps_data = read_gps_data() base_location["latitude"] = gps_data["latitude"] base_location["longitude"] = gps_data["longitude"] if gps_data.get("altitude"): base_location["altitude"] = gps_data["altitude"] print("Base location found") except KeyError: print("Base location could not be found") except Exception as e: print(f"Error finding location: {e}") return base_location def get_current_location(dictionary={}): global base_location try: gps_data = read_gps_data() dictionary["latitude"] = gps_data["latitude"] dictionary["longitude"] = gps_data["longitude"] if gps_data.get("altitude"): dictionary["altitude"] = gps_data["altitude"] if "latitude" in base_location: dictionary["baseLatitude"] = base_location["latitude"] dictionary["baseLongitude"] = base_location["longitude"] if base_location.get("altitude"): dictionary["baseAltitude"] = base_location["altitude"] dictionary["distance"] = calculate_distance(base_location, gps_data) except KeyError: print("Location could not be found") dictionary["distance"] = "Unknown" except Exception as e: print(f"Error finding location: {e}") dictionary["distance"] = "Error" return dictionary # Ping base station def ping_basestation( address="10.45.0.1", dictionary={} ): # raspberry pi address 192.168.0.29, host1 is 0.30 try: result = subprocess.run( ["ping", "-c", "1", address], capture_output=True, text=True ) match = re.search(r"time=([\d.]+)", result.stdout) if match: time_value = match.group(1) dictionary["ping_time"] = time_value dictionary["ping_stats"] = result.stdout return dictionary else: dictionary["ping_time"] = "Not found" dictionary["ping_stats"] = result.stdout return dictionary except Exception as e: return {"error": str(e)} # Run iperf test def collect_iperf( filename, server_ip="10.45.0.1", duration=10, is_client=True, ): if is_client: commands = [["iperf3", "-s"]] else: commands = [ ["iperf3", "-c", server_ip, "-t", str(duration)], ["iperf3", "-c", server_ip, "-t", str(duration), "-R"], ] for command in commands: try: # try: # gps_data = read_gps_data() # start_distance = calculate_distance(base_location, gps_data) # except: # print("Could not collect location") # Run the command result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) output = result.stdout # Check for errors if result.returncode != 0: print(f"Error running iperf3: {result.stderr}") return result.stderr # try: # gps_data = read_gps_data() # end_distance = calculate_distance(base_location, gps_data) # except: # pass # Look for final sender and receiver bitrates (usually in summary lines) matches = re.findall( r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+Mbits/sec\s+(\S+)?", output, ) if len(matches) >= 1: # Take the last throughput entry last_entry = matches[-1] sender_bitrate = float(last_entry[0]) # Sometimes iperf omits receiver or sender lines depending on direction # Try to find both separately: receiver_match = re.search(r"receiver", output, re.IGNORECASE) sender_match = re.search(r"sender", output, re.IGNORECASE) if receiver_match and sender_match and len(matches) >= 2: receiver_bitrate = float(matches[-1][0]) sender_bitrate = float(matches[-2][0]) else: receiver_bitrate = sender_bitrate # fallback: assume same else: bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output) sender_bitrate = float(bitrates[-2]) receiver_bitrate = float(bitrates[-1]) data = { "iperf_full": output, "sender_bitrate": sender_bitrate, "receiver_bitrate": receiver_bitrate, "note": ( "avgs are calculated with the assumption " "that all value are in Mbits/sec" ), # "start_distance": start_distance, # "end_distance": end_distance, } print("\nIPERF complete") print(f"IPERF sender bitrate: {sender_bitrate}") print(f"IPERF receiver bitrate: {receiver_bitrate}") save_data_to_json(data=data, filename=filename) except Exception as e: print(f"iPerf Error: {e}") # Collect and send data continuously def collect_data(filename): global base_location global running while running: data = {} # data = get_current_location(dictionary=data) data = get_modem_cops(dictionary=data) data = get_modem_creg(dictionary=data) data = get_modem_csq(dictionary=data) data = get_modem_rsrp(dictionary=data) data = get_modem_rsrq(dictionary=data) data = get_modem_sinr(dictionary=data) data = get_modem_cpol(dictionary=data) data = get_modem_qnwcfg(dictionary=data) data = get_modem_qnwinfo(dictionary=data) data = get_modem_qspn(dictionary=data) data = ping_basestation(dictionary=data) data["timestamp"] = time.time() # Send to server # print(f"\nDistance: {data.get('distance')}") print(f"\nService: {data.get('network information')}") print(f"Ping Time: {data.get('ping_time')}") print( f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} {data.get('RSRP RX2')} {data.get('RSRP RX3')}" ) save_data_to_json(data=data, filename=filename) time.sleep(5) # Collect and send data continuously def collect_sierra_data(filename): global base_location global running while running: data = {} data = get_current_location(dictionary=data) data = get_modem_nr_info(dictionary=data) data = get_modem_status(dictionary=data) data = ping_basestation(dictionary=data) data["timestamp"] = time.time() # Send to server print(f"\nDistance: {data.get('distance')}") print(f"Ping Time: {data.get('ping_time')}") print(f"RSRP: {data.get('RSRP')}") save_data_to_json(data=data, filename=filename) time.sleep(5) # Main function def main(): global running foldername = 'data/boat_relay_sept_11' os.makedirs(foldername, exist_ok=True) filename = foldername + "/collection_" + str(int(time.time())) + ".json" print("Setting configs...") set_configs() print( "Type 'l' to set basestation location, 'b' to begin, " "'s' to stop, 'i' to run an iperf test, or 'x' to exit:" ) while True: command = input("> ").strip().lower() if command == "b" and not running: print("Starting data collection...") running = True threading.Thread(target=collect_data, args=(filename,)).start() elif command == "l": base_location_data = set_base_location() save_data_to_json(data=base_location_data, filename=filename) elif command == "i": threading.Thread(target=collect_iperf, args=(filename, "10.45.0.1")).start() elif command == "s" and running: print("Stopping data collection...") running = False elif command == "x": running = False break elif command == "m": base_location_data = set_base_location() start_time = time.time() data = get_current_location() data["timestamp"] = time.time() end_time = time.time() print("Collected Data:") pprint.pprint(data) print(" ") save_data_to_json(data=data, filename=filename) print(f"Run time: {end_time - start_time}") elif command not in ["l", "b", "s", "i", "x"]: print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.") if __name__ == "__main__": main()