range-testing/communication.py

350 lines
11 KiB
Python
Raw Normal View History

2025-09-10 10:50:07 -04:00
import os
import pprint
import re
import subprocess
import threading
import time
import serial
2025-09-10 10:55:05 -04:00
from at_commands import (
get_modem_cops,
get_modem_cpol,
get_modem_creg,
get_modem_csq,
get_modem_qnwcfg,
get_modem_qnwinfo,
get_modem_qspn,
get_modem_rsrp,
get_modem_rsrq,
get_modem_sinr,
set_configs,
)
from helper_functions import calculate_distance, parse_lat_lon, save_data_to_json
from sierra_commands import get_modem_nr_info, get_modem_status
# Globals
running = False # To control start/stop
base_location = None # The basestation location
def read_gps_data(
port: str = "/dev/ttyACM0", baudrate: int = 9600, timeout: int = 5
) -> dict:
"""
Reads GPS data from a u-blox 7 GPS/GLONASS device and returns it as a dictionary.
Args:
port (str): The serial port the GPS device is connected to. Default is '/dev/ttyACM0'.
baudrate (int): Baud rate for serial communication. Default is 9600.
timeout (int): Timeout in seconds for the serial port. Default is 5 seconds.
Returns:
dict: A dictionary containing parsed GPS data.
"""
gps_data = {}
attempts = 0
try:
with serial.Serial(port, baudrate=baudrate, timeout=timeout) as ser:
while True:
# Read a line from the GPS device
line = ser.readline().decode("ascii", errors="ignore").strip()
# Filter for GGA or RMC sentences for relevant data
if line.startswith("$GPGGA"): # Global Positioning System Fix Data
parts = line.split(",")
gps_data["utc_time"] = parts[1]
gps_data["latitude"] = parse_lat_lon(parts[2], parts[3])
gps_data["longitude"] = parse_lat_lon(parts[4], parts[5])
gps_data["altitude"] = float(parts[9]) if parts[9] else None
# Count number of times GPGGA has been queried
attempts = attempts + 1
# Stop after collecting sufficient data
if (
gps_data.get("utc_time")
and gps_data.get("latitude")
and gps_data.get("longitude")
):
break
elif attempts >= 3:
gps_data = {}
break
except serial.SerialException as e:
print(f"Serial communication error: {e}")
except Exception as e:
print(f"Error: {e}")
return gps_data
def set_base_location():
global base_location
base_location = {}
try:
gps_data = read_gps_data()
base_location["latitude"] = gps_data["latitude"]
base_location["longitude"] = gps_data["longitude"]
if gps_data.get("altitude"):
base_location["altitude"] = gps_data["altitude"]
print("Base location found")
except KeyError:
print("Base location could not be found")
except Exception as e:
print(f"Error finding location: {e}")
return base_location
def get_current_location(dictionary={}):
global base_location
try:
gps_data = read_gps_data()
dictionary["latitude"] = gps_data["latitude"]
dictionary["longitude"] = gps_data["longitude"]
if gps_data.get("altitude"):
dictionary["altitude"] = gps_data["altitude"]
if "latitude" in base_location:
dictionary["baseLatitude"] = base_location["latitude"]
dictionary["baseLongitude"] = base_location["longitude"]
if base_location.get("altitude"):
dictionary["baseAltitude"] = base_location["altitude"]
dictionary["distance"] = calculate_distance(base_location, gps_data)
except KeyError:
print("Location could not be found")
dictionary["distance"] = "Unknown"
except Exception as e:
print(f"Error finding location: {e}")
dictionary["distance"] = "Error"
return dictionary
# Ping base station
def ping_basestation(
address="10.45.0.1", dictionary={}
): # raspberry pi address 192.168.0.29, host1 is 0.30
try:
result = subprocess.run(
["ping", "-c", "1", address], capture_output=True, text=True
)
match = re.search(r"time=([\d.]+)", result.stdout)
if match:
time_value = match.group(1)
dictionary["ping_time"] = time_value
dictionary["ping_stats"] = result.stdout
return dictionary
else:
dictionary["ping_time"] = "Not found"
dictionary["ping_stats"] = result.stdout
return dictionary
except Exception as e:
return {"error": str(e)}
# Run iperf test
def collect_iperf(
filename,
server_ip="10.45.0.1",
duration=10,
is_client=True,
):
if is_client:
commands = [["iperf3", "-s"]]
else:
commands = [
2025-09-10 10:55:05 -04:00
["iperf3", "-c", server_ip, "-t", str(duration)],
["iperf3", "-c", server_ip, "-t", str(duration), "-R"],
]
for command in commands:
try:
2025-09-10 10:55:05 -04:00
# try:
# gps_data = read_gps_data()
# start_distance = calculate_distance(base_location, gps_data)
# except:
# print("Could not collect location")
# Run the command
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
output = result.stdout
# Check for errors
if result.returncode != 0:
print(f"Error running iperf3: {result.stderr}")
return result.stderr
# try:
# gps_data = read_gps_data()
# end_distance = calculate_distance(base_location, gps_data)
# except:
# pass
# Look for final sender and receiver bitrates (usually in summary lines)
matches = re.findall(
r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+Mbits/sec\s+(\S+)?",
output,
)
if len(matches) >= 1:
# Take the last throughput entry
last_entry = matches[-1]
sender_bitrate = float(last_entry[0])
# Sometimes iperf omits receiver or sender lines depending on direction
# Try to find both separately:
receiver_match = re.search(r"receiver", output, re.IGNORECASE)
sender_match = re.search(r"sender", output, re.IGNORECASE)
if receiver_match and sender_match and len(matches) >= 2:
receiver_bitrate = float(matches[-1][0])
sender_bitrate = float(matches[-2][0])
else:
receiver_bitrate = sender_bitrate # fallback: assume same
else:
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output)
sender_bitrate = float(bitrates[-2])
receiver_bitrate = float(bitrates[-1])
data = {
"iperf_full": output,
"sender_bitrate": sender_bitrate,
"receiver_bitrate": receiver_bitrate,
"note": (
"avgs are calculated with the assumption "
"that all value are in Mbits/sec"
),
# "start_distance": start_distance,
# "end_distance": end_distance,
}
print("\nIPERF complete")
print(f"IPERF sender bitrate: {sender_bitrate}")
print(f"IPERF receiver bitrate: {receiver_bitrate}")
save_data_to_json(data=data, filename=filename)
except Exception as e:
print(f"iPerf Error: {e}")
# Collect and send data continuously
def collect_data(filename):
global base_location
global running
while running:
data = {}
# data = get_current_location(dictionary=data)
data = get_modem_cops(dictionary=data)
data = get_modem_creg(dictionary=data)
data = get_modem_csq(dictionary=data)
data = get_modem_rsrp(dictionary=data)
data = get_modem_rsrq(dictionary=data)
data = get_modem_sinr(dictionary=data)
data = get_modem_cpol(dictionary=data)
data = get_modem_qnwcfg(dictionary=data)
data = get_modem_qnwinfo(dictionary=data)
data = get_modem_qspn(dictionary=data)
data = ping_basestation(dictionary=data)
data["timestamp"] = time.time()
# Send to server
# print(f"\nDistance: {data.get('distance')}")
print(f"\nService: {data.get('network information')}")
print(f"Ping Time: {data.get('ping_time')}")
print(
f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} {data.get('RSRP RX2')} {data.get('RSRP RX3')}"
)
save_data_to_json(data=data, filename=filename)
time.sleep(5)
# Collect and send data continuously
def collect_sierra_data(filename):
global base_location
global running
while running:
data = {}
data = get_current_location(dictionary=data)
data = get_modem_nr_info(dictionary=data)
data = get_modem_status(dictionary=data)
data = ping_basestation(dictionary=data)
data["timestamp"] = time.time()
# Send to server
print(f"\nDistance: {data.get('distance')}")
print(f"Ping Time: {data.get('ping_time')}")
print(f"RSRP: {data.get('RSRP')}")
save_data_to_json(data=data, filename=filename)
time.sleep(5)
# Main function
def main():
global running
2025-09-10 10:55:05 -04:00
foldername = "data/boat_relay_sept_11"
2025-09-10 10:50:07 -04:00
os.makedirs(foldername, exist_ok=True)
2025-09-10 10:52:03 -04:00
filename = foldername + "/test_" + str(int(time.time())) + ".json"
print("Setting configs...")
set_configs()
print(
"Type 'l' to set basestation location, 'b' to begin, "
"'s' to stop, 'i' to run an iperf test, or 'x' to exit:"
)
while True:
command = input("> ").strip().lower()
if command == "b" and not running:
print("Starting data collection...")
running = True
threading.Thread(target=collect_data, args=(filename,)).start()
elif command == "l":
base_location_data = set_base_location()
save_data_to_json(data=base_location_data, filename=filename)
elif command == "i":
threading.Thread(target=collect_iperf, args=(filename, "10.45.0.1")).start()
elif command == "s" and running:
print("Stopping data collection...")
running = False
elif command == "x":
running = False
break
elif command == "m":
base_location_data = set_base_location()
start_time = time.time()
data = get_current_location()
data["timestamp"] = time.time()
end_time = time.time()
print("Collected Data:")
pprint.pprint(data)
print(" ")
save_data_to_json(data=data, filename=filename)
print(f"Run time: {end_time - start_time}")
elif command not in ["l", "b", "s", "i", "x"]:
print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.")
if __name__ == "__main__":
main()