Initial commit: all py files and gitignore

This commit is contained in:
madrigal 2025-09-10 10:45:48 -04:00
parent bdcd60c40e
commit f779de6524
6 changed files with 1230 additions and 0 deletions

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
# Environments
.env
.venv
env/
venv/
ENV/
# PyCharm
.idea/
# Visual Studio Code
.vscode/
# Generated files
*.dot
*.hdf5
*.npy
*.png
*.json
images/
data/

368
at_commands.py Normal file
View File

@ -0,0 +1,368 @@
import serial
from helper_functions import comma_split, extract_numbers
# For more info on the commands:
# https://files.waveshare.com/upload/8/8a/Quectel_RG520N%26RG52xF%26RG530F%26RM520N%26RM530N_Series_AT_Commands_Manual_V1.0.0_Preliminary_20220812.pdf
# Configure connection/5G parameters
def set_configs(port="/dev/ttyUSB2", baudrate=115200):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b'AT+QNWPREFCFG="nr5g_band",78\r')
response = ser.readlines()
print(response)
ser.write(b'AT+QNWPREFCFG="mode_pref",NR5G\r')
response = ser.readlines()
print(response)
ser.write(b'AT+QNWPREFCFG="roam_pref",1\r')
response = ser.readlines()
print(response)
ser.close()
print("Finished setting configs.")
except Exception as e:
return print(f"Configuration error: {e}")
# Fetch operator status (5.1)
def get_modem_cops(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+COPS?\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "COPS:" in decoded_item:
nums = extract_numbers(decoded_item)
dictionary["mode"] = nums[0]
return dictionary
elif "CME" in decoded_item:
dictionary["cops"] = decoded_item
except Exception as e:
dictionary["cops encoded"] = str(item)
print(f"Could not decode COPS item:\t{str(item)}\n{e}")
return dictionary
except Exception as e:
return {"COPS error": f"{e}"}
# Fetch network registration status (5.2)
def get_modem_creg(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+CREG?\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "CREG:" in decoded_item:
nums = extract_numbers(decoded_item)
dictionary["network registration result code"] = nums[0]
dictionary["creg status"] = nums[1]
return dictionary
elif "CME" in decoded_item:
dictionary["creg error"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["creg encoded"] = decoded_item
print(f"Could not decode CREG item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"CREG error": f"{e}"}
# Fetch modem csq stats (5.9)
def get_modem_csq(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+CSQ\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "CSQ:" in decoded_item:
nums = extract_numbers(decoded_item)
dictionary["RSSI"] = nums[0]
dictionary["BER"] = nums[1]
return dictionary
elif "CME" in decoded_item:
dictionary["csq error"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["csq encoded"] = decoded_item
print(f"Could not decode CSQ item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"CSQ error": f"{e}"}
# Fetch RSRP (5.10)
def get_modem_rsrp(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+QRSRP\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "QRSRP:" in decoded_item:
_, data = decoded_item.split(":")
decoded_list = comma_split(data)
if type(decoded_list) == list:
dictionary["RSRP PRX"] = decoded_list[0]
dictionary["RSRP DRX"] = decoded_list[1]
dictionary["RSRP RX2"] = decoded_list[2]
dictionary["RSRP RX3"] = decoded_list[3]
dictionary["RSRP sysmode"] = decoded_list[4]
else:
dictionary["RSRP"] = decoded_list
return dictionary
elif "CME" in decoded_item:
dictionary["qrsrp error"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["qrsrp encoded"] = decoded_item
print(f"Could not decode QRSRP item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"QRSRP error": f"{e}"}
# Fetch RSRQ (5.11)
def get_modem_rsrq(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+QRSRQ\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "QRSRQ:" in decoded_item:
_, data = decoded_item.split(":")
decoded_list = comma_split(data)
if type(decoded_list) == list:
dictionary["RSRQ PRX"] = decoded_list[0]
dictionary["RSRQ DRX"] = decoded_list[1]
dictionary["RSRQ RX2"] = decoded_list[2]
dictionary["RSRQ RX3"] = decoded_list[3]
dictionary["RSRQ sysmode"] = decoded_list[4]
else:
dictionary["RSRQ"] = decoded_list
return dictionary
elif "CME" in decoded_item:
dictionary["qrsrq error"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["qrsrq encoded"] = decoded_item
print(f"Could not decode QRSRQ item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"QRSRQ error": f"{e}"}
# Fetch SINR of the current service network (5.12)
def get_modem_sinr(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+QSINR?\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "QSINR:" in decoded_item:
_, data = decoded_item.split(":")
decoded_list = comma_split(data)
if type(decoded_list) == list:
dictionary["SINR PRX"] = decoded_list[0]
dictionary["SINR DRX"] = decoded_list[1]
dictionary["SINR RX2"] = decoded_list[2]
dictionary["SINR RX3"] = decoded_list[3]
dictionary["SINR sysmode"] = decoded_list[4]
else:
decoded_item = decoded_list
return dictionary
elif "CME" in decoded_item:
dictionary["qsinr error"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["qsinr encoded"] = decoded_item
print(f"Could not decode QSINR item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"QSINR error": f"{e}"}
# Fetch the list of preferred operators (5.13)
def get_modem_cpol(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+CPOL?\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "CPOL:" in decoded_item:
_, data = decoded_item.split(":")
decoded_list = comma_split(data)
if type(decoded_list) == list:
dictionary["index"] = decoded_list[0]
dictionary["format"] = decoded_list[1]
dictionary["oper"] = decoded_list[2]
dictionary["GSM"] = decoded_list[3]
dictionary["GSM compact"] = decoded_list[4]
dictionary["UTRAN"] = decoded_list[5]
dictionary["E-UTRAN"] = decoded_list[6]
dictionary["NG-RAN"] = decoded_list[7]
else:
dictionary["cpol"] = decoded_list
return dictionary
elif "CME" in decoded_item:
dictionary["cpol error"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["cpol encoded"] = decoded_item
print(f"Could not decode CPOL item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"CPOL error": f"{e}"}
# Fetch network parameters (5.21.4)
def get_modem_qnwcfg(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b'AT+QNWCFG="up/down"\r')
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "QNWCFG:" in decoded_item:
_, data = decoded_item.split(":")
decoded_list = comma_split(data)
if type(decoded_list) == list:
dictionary["up/down"] = decoded_list[0]
dictionary["uplink (bytes/s)"] = decoded_list[1]
dictionary["downlink (bytes/s)"] = decoded_list[2]
dictionary["time interval"] = decoded_list[3]
else:
dictionary["qnwcfg"] = decoded_list
return dictionary
elif "CME" in decoded_item:
dictionary["qnwcfg"] = decoded_item
except Exception as e:
decoded_item = str(item)
dictionary["qnwcfg encoded"] = decoded_item
print(f"Could not decode QNWCFG item:\t{decoded_item}\n{e}")
return dictionary
except Exception as e:
return {"QNWCFG error": f"{e}"}
# Fetch network information (5.18)
def get_modem_qnwinfo(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+QNWINFO\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "QNWINFO:" in decoded_item:
_, data = decoded_item.split(":")
dictionary["network information"] = data
return dictionary
elif "CME" in decoded_item:
dictionary["network information"] = decoded_item
except Exception as e:
dictionary["qnwinfo encoded"] = str(item)
print(f"Could not decode QNWINFO item:\t{str(item)}\n{e}")
return dictionary
except Exception as e:
return {"QNWINFO error": f"{e}"}
# Fetch servie provider name (5.19)
def get_modem_qspn(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT+QSPN\r")
response = ser.readlines()
ser.close()
for item in response:
try:
decoded_item = item.decode("utf-8")
if "QSPN:" in decoded_item:
_, data = decoded_item.split(":")
dictionary["service provider"] = data
return dictionary
elif "CME" in decoded_item:
dictionary["service provider"] = decoded_item
except Exception as e:
dictionary["qspn encoded"] = str(item)
print(f"Could not decode QSPN item:\t{str(item)}\n{e}")
return dictionary
except Exception as e:
return {"QSPN error": f"{e}"}

338
communication.py Normal file
View File

@ -0,0 +1,338 @@
import pprint
import re
import subprocess
import threading
import time
import serial
from at_commands import (get_modem_cops, get_modem_cpol, get_modem_creg,
get_modem_csq, get_modem_qnwcfg, get_modem_qnwinfo,
get_modem_qspn, get_modem_rsrp, get_modem_rsrq,
get_modem_sinr, set_configs)
from helper_functions import (calculate_distance, parse_lat_lon,
save_data_to_json)
from sierra_commands import get_modem_nr_info, get_modem_status
# Globals
running = False # To control start/stop
base_location = None # The basestation location
def read_gps_data(
port: str = "/dev/ttyACM0", baudrate: int = 9600, timeout: int = 5
) -> dict:
"""
Reads GPS data from a u-blox 7 GPS/GLONASS device and returns it as a dictionary.
Args:
port (str): The serial port the GPS device is connected to. Default is '/dev/ttyACM0'.
baudrate (int): Baud rate for serial communication. Default is 9600.
timeout (int): Timeout in seconds for the serial port. Default is 5 seconds.
Returns:
dict: A dictionary containing parsed GPS data.
"""
gps_data = {}
attempts = 0
try:
with serial.Serial(port, baudrate=baudrate, timeout=timeout) as ser:
while True:
# Read a line from the GPS device
line = ser.readline().decode("ascii", errors="ignore").strip()
# Filter for GGA or RMC sentences for relevant data
if line.startswith("$GPGGA"): # Global Positioning System Fix Data
parts = line.split(",")
gps_data["utc_time"] = parts[1]
gps_data["latitude"] = parse_lat_lon(parts[2], parts[3])
gps_data["longitude"] = parse_lat_lon(parts[4], parts[5])
gps_data["altitude"] = float(parts[9]) if parts[9] else None
# Count number of times GPGGA has been queried
attempts = attempts + 1
# Stop after collecting sufficient data
if (
gps_data.get("utc_time")
and gps_data.get("latitude")
and gps_data.get("longitude")
):
break
elif attempts >= 3:
gps_data = {}
break
except serial.SerialException as e:
print(f"Serial communication error: {e}")
except Exception as e:
print(f"Error: {e}")
return gps_data
def set_base_location():
global base_location
base_location = {}
try:
gps_data = read_gps_data()
base_location["latitude"] = gps_data["latitude"]
base_location["longitude"] = gps_data["longitude"]
if gps_data.get("altitude"):
base_location["altitude"] = gps_data["altitude"]
print("Base location found")
except KeyError:
print("Base location could not be found")
except Exception as e:
print(f"Error finding location: {e}")
return base_location
def get_current_location(dictionary={}):
global base_location
try:
gps_data = read_gps_data()
dictionary["latitude"] = gps_data["latitude"]
dictionary["longitude"] = gps_data["longitude"]
if gps_data.get("altitude"):
dictionary["altitude"] = gps_data["altitude"]
if "latitude" in base_location:
dictionary["baseLatitude"] = base_location["latitude"]
dictionary["baseLongitude"] = base_location["longitude"]
if base_location.get("altitude"):
dictionary["baseAltitude"] = base_location["altitude"]
dictionary["distance"] = calculate_distance(base_location, gps_data)
except KeyError:
print("Location could not be found")
dictionary["distance"] = "Unknown"
except Exception as e:
print(f"Error finding location: {e}")
dictionary["distance"] = "Error"
return dictionary
# Ping base station
def ping_basestation(
address="10.45.0.1", dictionary={}
): # raspberry pi address 192.168.0.29, host1 is 0.30
try:
result = subprocess.run(
["ping", "-c", "1", address], capture_output=True, text=True
)
match = re.search(r"time=([\d.]+)", result.stdout)
if match:
time_value = match.group(1)
dictionary["ping_time"] = time_value
dictionary["ping_stats"] = result.stdout
return dictionary
else:
dictionary["ping_time"] = "Not found"
dictionary["ping_stats"] = result.stdout
return dictionary
except Exception as e:
return {"error": str(e)}
# Run iperf test
def collect_iperf(
filename,
server_ip="10.45.0.1",
duration=10,
is_client=True,
):
if is_client:
commands = [["iperf3", "-s"]]
else:
commands = [
["iperf3", "-c", server_ip, "-t", str(duration)],
["iperf3", "-c", server_ip, "-t", str(duration), "-R"],
]
for command in commands:
try:
# try:
# gps_data = read_gps_data()
# start_distance = calculate_distance(base_location, gps_data)
# except:
# print("Could not collect location")
# Run the command
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
output = result.stdout
# Check for errors
if result.returncode != 0:
print(f"Error running iperf3: {result.stderr}")
return result.stderr
# try:
# gps_data = read_gps_data()
# end_distance = calculate_distance(base_location, gps_data)
# except:
# pass
# Look for final sender and receiver bitrates (usually in summary lines)
matches = re.findall(
r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+Mbits/sec\s+(\S+)?",
output,
)
if len(matches) >= 1:
# Take the last throughput entry
last_entry = matches[-1]
sender_bitrate = float(last_entry[0])
# Sometimes iperf omits receiver or sender lines depending on direction
# Try to find both separately:
receiver_match = re.search(r"receiver", output, re.IGNORECASE)
sender_match = re.search(r"sender", output, re.IGNORECASE)
if receiver_match and sender_match and len(matches) >= 2:
receiver_bitrate = float(matches[-1][0])
sender_bitrate = float(matches[-2][0])
else:
receiver_bitrate = sender_bitrate # fallback: assume same
else:
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output)
sender_bitrate = float(bitrates[-2])
receiver_bitrate = float(bitrates[-1])
data = {
"iperf_full": output,
"sender_bitrate": sender_bitrate,
"receiver_bitrate": receiver_bitrate,
"note": (
"avgs are calculated with the assumption "
"that all value are in Mbits/sec"
),
# "start_distance": start_distance,
# "end_distance": end_distance,
}
print("\nIPERF complete")
print(f"IPERF sender bitrate: {sender_bitrate}")
print(f"IPERF receiver bitrate: {receiver_bitrate}")
save_data_to_json(data=data, filename=filename)
except Exception as e:
print(f"iPerf Error: {e}")
# Collect and send data continuously
def collect_data(filename):
global base_location
global running
while running:
data = {}
# data = get_current_location(dictionary=data)
data = get_modem_cops(dictionary=data)
data = get_modem_creg(dictionary=data)
data = get_modem_csq(dictionary=data)
data = get_modem_rsrp(dictionary=data)
data = get_modem_rsrq(dictionary=data)
data = get_modem_sinr(dictionary=data)
data = get_modem_cpol(dictionary=data)
data = get_modem_qnwcfg(dictionary=data)
data = get_modem_qnwinfo(dictionary=data)
data = get_modem_qspn(dictionary=data)
data = ping_basestation(dictionary=data)
data["timestamp"] = time.time()
# Send to server
# print(f"\nDistance: {data.get('distance')}")
print(f"\nService: {data.get('network information')}")
print(f"Ping Time: {data.get('ping_time')}")
print(
f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} {data.get('RSRP RX2')} {data.get('RSRP RX3')}"
)
save_data_to_json(data=data, filename=filename)
time.sleep(5)
# Collect and send data continuously
def collect_sierra_data(filename):
global base_location
global running
while running:
data = {}
data = get_current_location(dictionary=data)
data = get_modem_nr_info(dictionary=data)
data = get_modem_status(dictionary=data)
data = ping_basestation(dictionary=data)
data["timestamp"] = time.time()
# Send to server
print(f"\nDistance: {data.get('distance')}")
print(f"Ping Time: {data.get('ping_time')}")
print(f"RSRP: {data.get('RSRP')}")
save_data_to_json(data=data, filename=filename)
time.sleep(5)
# Main function
def main():
global running
filename = "data/boat_relay_sept_11/collection_" + str(int(time.time())) + ".json"
print("Setting configs...")
set_configs()
print(
"Type 'l' to set basestation location, 'b' to begin, "
"'s' to stop, 'i' to run an iperf test, or 'x' to exit:"
)
while True:
command = input("> ").strip().lower()
if command == "b" and not running:
print("Starting data collection...")
running = True
threading.Thread(target=collect_data, args=(filename,)).start()
elif command == "l":
base_location_data = set_base_location()
save_data_to_json(data=base_location_data, filename=filename)
elif command == "i":
threading.Thread(target=collect_iperf, args=(filename, "10.45.0.1")).start()
elif command == "s" and running:
print("Stopping data collection...")
running = False
elif command == "x":
running = False
break
elif command == "m":
base_location_data = set_base_location()
start_time = time.time()
data = get_current_location()
data["timestamp"] = time.time()
end_time = time.time()
print("Collected Data:")
pprint.pprint(data)
print(" ")
save_data_to_json(data=data, filename=filename)
print(f"Run time: {end_time - start_time}")
elif command not in ["l", "b", "s", "i", "x"]:
print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.")
if __name__ == "__main__":
main()

121
helper_functions.py Normal file
View File

@ -0,0 +1,121 @@
import json
import math
import re
def calculate_distance(point1: dict, point2: dict) -> float:
"""
Calculate the distance in meters between two geographic points with altitude.
:param point1: A dictionary with 'latitude', 'longitude', and optional 'altitude'.
:param point2: A dictionary with 'latitude', 'longitude', and optional 'altitude'.
:return: Distance in meters as a float.
"""
try:
# Extract latitude, longitude, and altitude
lat1, lon1, alt1 = (
point1["latitude"],
point1["longitude"],
point1.get("altitude", 0),
)
lat2, lon2, alt2 = (
point2["latitude"],
point2["longitude"],
point2.get("altitude", 0),
)
except KeyError:
return "Points were not properly set"
# Convert latitude and longitude from degrees to radians
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
# Radius of the Earth in meters
R = 6369500
# Compute deltas
delta_lat = lat2 - lat1
delta_lon = lon2 - lon1
# Haversine formula for horizontal distance
a = (
math.sin(delta_lat / 2) ** 2
+ math.cos(lat1) * math.cos(lat2) * math.sin(delta_lon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
horizontal_distance = R * c
# Altitude difference
delta_alt = alt2 - alt1
# Total 3D distance
distance = math.sqrt(horizontal_distance**2 + delta_alt**2)
return distance
# Split string by commas, if there are commas
def comma_split(string: str):
if "," in string:
return string.split(",")
else:
return string
# Extract stats from strings
def extract_numbers(string):
pattern = r"-?\d+\.\d+|-?\d+"
numbers = re.findall(pattern, string)
numbers = [float(num) if "." in num else int(num) for num in numbers]
return numbers
# Parse the latitude or longitude from the NMEA format
def parse_lat_lon(value: str, direction: str) -> float:
"""
Parses latitude or longitude from NMEA format.
Args:
value (str): The raw NMEA latitude or longitude value.
direction (str): Direction indicator ('N', 'S', 'E', 'W').
Returns:
float: The parsed latitude or longitude as a decimal degree.
"""
if not value or not direction:
return None
degrees = int(value[:2])
minutes = float(value[2:])
decimal = degrees + minutes / 60
if direction in ["S", "W"]:
decimal = -decimal
return decimal
# Convert RSSI to DBM
def rssi_to_dbm(rssi):
if rssi == 99:
return "Unknown"
else:
# Convert RSSI to dBm
return -113 + 2 * rssi
# Save data to JSON file
def save_data_to_json(data, filename):
try:
# Load existing data
try:
with open(filename, "r") as file:
existing_data = json.load(file)
except FileNotFoundError:
# If file doesn't exist, start with an empty list
existing_data = []
# Append new data
existing_data.append(data)
# Save updated data back to the file
with open(filename, "w") as file:
json.dump(existing_data, file, indent=4)
except Exception as e:
print(f"Error saving data to JSON: {e}")

319
plots.py Normal file
View File

@ -0,0 +1,319 @@
import json
import re
import matplotlib.pyplot as plt
def plot_rsrp(filename):
# Load the JSON file
with open(filename, "r") as file:
data = json.load(file)
# Extract distance and RSRP values (convert RSRP values to integers)
distances = []
rsrp_prx = []
rsrp_drx = []
rsrp_rx2 = []
rsrp_rx3 = []
for entry in data:
try:
rsrp_prx.append(
-169
if int(entry["RSRP PRX"].strip()) == -32768
else int(entry.get("RSRP PRX", -169))
)
rsrp_drx.append(
-169
if int(entry["RSRP DRX"].strip()) == -32768
else int(entry.get("RSRP DRX", -169))
)
rsrp_rx2.append(
-169
if int(entry["RSRP RX2"].strip()) == -32768
else int(entry.get("RSRP RX2", -169))
)
rsrp_rx3.append(
-169
if int(entry["RSRP RX3"].strip()) == -32768
else int(entry.get("RSRP RX3", -169))
)
distances.append(entry["distance"])
except (ValueError, KeyError):
continue
# Plot the data
plt.figure(figsize=(10, 6))
plt.plot(distances, rsrp_prx, label="RSRP PRX", marker="o")
plt.plot(distances, rsrp_drx, label="RSRP DRX", marker="s")
plt.plot(distances, rsrp_rx2, label="RSRP RX2", marker="^")
plt.plot(distances, rsrp_rx3, label="RSRP RX3", marker="d")
plt.title("RSRP vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("RSRP (dBm)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
def plot_rsrq(filename):
# Load the JSON file
with open(filename, "r") as file:
data = json.load(file)
# Extract distance and RSRQ values (convert RSRQ values to integers)
distances = []
rsrq_prx = []
rsrq_drx = []
rsrq_rx2 = []
rsrq_rx3 = []
for entry in data:
try:
rsrq_prx.append(
-20
if int(entry["RSRQ PRX"].strip()) == -32768
else int(entry.get("RSRQ PRX", -20))
)
rsrq_drx.append(
-20
if int(entry["RSRQ DRX"].strip()) == -32768
else int(entry.get("RSRQ DRX", -20))
)
rsrq_rx2.append(
-20
if int(entry["RSRQ RX2"].strip()) == -32768
else int(entry.get("RSRQ RX2", -20))
)
rsrq_rx3.append(
-20
if int(entry["RSRQ RX3"].strip()) == -32768
else int(entry.get("RSRQ RX3", -20))
)
distances.append(entry["distance"])
except (ValueError, KeyError):
continue
# Plot the data
plt.figure(figsize=(10, 6))
plt.plot(distances, rsrq_prx, label="RSRQ PRX", marker="o")
plt.plot(distances, rsrq_drx, label="RSRQ DRX", marker="s")
plt.plot(distances, rsrq_rx2, label="RSRQ RX2", marker="^")
plt.plot(distances, rsrq_rx3, label="RSRQ RX3", marker="d")
plt.title("RSRQ vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("RSRQ (dBm)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
def plot_iperf(filename):
# Load the JSON file
with open(filename, "r") as file:
data = json.load(file)
distances = []
sender = []
receiver = []
for entry in data:
try:
message = entry["iperf_full"]
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
sender.append(float(bitrates[-2]))
receiver.append(float(bitrates[-1]))
distances.append(entry["start_distance"])
except (ValueError, KeyError):
continue
# Plot the data
plt.figure(figsize=(10, 6))
plt.plot(distances, sender, label="Avg Sender Bitrate", marker="o")
plt.plot(distances, receiver, label="Avg Receiver Bitrate", marker="s")
plt.title("IPERF vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("Bitrate (Mbits/s)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
def plot_bytes(filename):
# Load the JSON file
with open(filename, "r") as file:
data = json.load(file)
distances = []
uplink = []
downlink = []
for entry in data:
try:
if (
int(entry["uplink (bytes/s)"].strip()) < 1000000
and int(entry["downlink (bytes/s)"].strip()) < 1000000
):
distances.append(entry["distance"])
uplink.append(int(entry["uplink (bytes/s)"].strip()))
downlink.append(int(entry["downlink (bytes/s)"].strip()))
except (ValueError, KeyError):
continue
# Plot the data
plt.figure(figsize=(10, 6))
plt.plot(distances, downlink, label="Downlink Bitrate", marker="o")
plt.plot(distances, uplink, label="Uplink Bitrate", marker="s")
plt.title("Bitrate vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("Bitrate (bytes/s)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
def plot_manual():
distances = [0, 100, 200, 300, 400, 500, 600, 700, 800]
rsrps = [-56, -82, -90, -93, -100, -105, -105, -116, -150]
sender = [0, 6.06, 6.95, 6.37, 6.96, 8.04, 7.30, 0, 0]
receiver = [0, 5.20, 6.10, 5.24, 6.08, 6.84, 6.37, 0, 0]
# Plot the RSRP data
plt.figure(figsize=(10, 6))
plt.plot(distances, rsrps, label="RSRP", marker="o")
plt.title("RSRP vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("RSRP (dBm)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
# Plot the iperf data
plt.figure(figsize=(10, 6))
plt.plot(distances, sender, label="Avg Sender Bitrate", marker="o")
plt.plot(distances, receiver, label="Avg Receiver Bitrate", marker="s")
plt.title("IPERF vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("Bitrate (Mbits/s)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
def plot_double_iperf(filename):
# Load the JSON file
with open(filename, "r") as file:
data = json.load(file)
distances = []
sender = []
receiver = []
reverse_distances = []
reverse_sender = []
reverse_receiver = []
for entry in data:
if "iperf_full" in entry:
if "Reverse mode" in entry["iperf_full"]:
try:
reverse_sender.append(float(entry["sender_bitrate"]))
reverse_receiver.append(float(entry["receiver_bitrate"]))
reverse_distances.append(entry["start_distance"])
except:
message = entry["iperf_full"]
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
reverse_sender.append(float(bitrates[-2]))
reverse_receiver.append(float(bitrates[-1]))
reverse_distances.append(entry["start_distance"])
else:
try:
sender.append(float(entry["sender_bitrate"]))
receiver.append(float(entry["receiver_bitrate"]))
distances.append(entry["start_distance"])
except:
message = entry["iperf_full"]
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
sender.append(float(bitrates[-2]))
receiver.append(float(bitrates[-1]))
distances.append(entry["start_distance"])
# Plot the data
plt.figure(figsize=(10, 6))
plt.plot(distances, sender, label="Avg Sender Bitrate", marker="o", color="red")
plt.plot(
distances,
receiver,
label="Avg Receiver Bitrate",
marker="s",
color="darkorange",
)
plt.plot(
reverse_distances,
reverse_sender,
label="Avg Reverse Sender Bitrate",
marker="^",
color="blue",
)
plt.plot(
reverse_distances,
reverse_receiver,
label="Avg Reverse Receiver Bitrate",
marker="d",
color="blueviolet",
)
plt.title("IPERF vs Distance")
plt.xlabel("Distance (m)")
plt.ylabel("Bitrate (Mbits/s)")
plt.legend()
plt.grid(True)
plt.tight_layout()
# Show the plot
plt.show()
if __name__ == "__main__":
# print("Connecting to host 10.46.0.1, port 5201\n[ 5] local 192.168.225.83 port 60164 connected to 10.46.0.1 port 5201\n[ ID] Interval Transfer Bitrate Retr Cwnd\n[ 5] 0.00-1.00 sec 361 KBytes 2.95 Mbits/sec 0 43.4 KBytes \n[ 5] 1.00-2.00 sec 329 KBytes 2.70 Mbits/sec 0 56.6 KBytes \n[ 5] 2.00-3.00 sec 782 KBytes 6.41 Mbits/sec 0 89.5 KBytes \n[ 5] 3.00-4.00 sec 379 KBytes 3.11 Mbits/sec 0 107 KBytes \n[ 5] 4.00-5.00 sec 569 KBytes 4.66 Mbits/sec 0 133 KBytes \n[ 5] 5.00-6.00 sec 379 KBytes 3.11 Mbits/sec 0 151 KBytes \n[ 5] 6.00-7.00 sec 632 KBytes 5.18 Mbits/sec 0 182 KBytes \n[ 5] 7.00-8.00 sec 569 KBytes 4.66 Mbits/sec 0 247 KBytes \n[ 5] 8.00-9.00 sec 379 KBytes 3.11 Mbits/sec 0 309 KBytes \n[ 5] 9.00-10.00 sec 442 KBytes 3.62 Mbits/sec 0 432 KBytes \n- - - - - - - - - - - - - - - - - - - - - - - - -\n[ ID] Interval Transfer Bitrate Retr\n[ 5] 0.00-10.00 sec 4.71 MBytes 3.95 Mbits/sec 0 sender\n[ 5] 0.00-10.82 sec 4.31 MBytes 3.34 Mbits/sec receiver\n\niperf Done.\n")
# print("Connecting to host 10.46.0.1, port 5201\n[ 5] local 192.168.225.83 port 44064 connected to 10.46.0.1 port 5201\n[ ID] Interval Transfer Bitrate Retr Cwnd\n[ 5] 0.00-1.00 sec 405 KBytes 3.32 Mbits/sec 0 44.8 KBytes \n[ 5] 1.00-2.00 sec 320 KBytes 2.62 Mbits/sec 0 57.9 KBytes \n[ 5] 2.00-3.00 sec 207 KBytes 1.69 Mbits/sec 0 65.8 KBytes \n[ 5] 3.00-4.00 sec 253 KBytes 2.07 Mbits/sec 0 79.0 KBytes \n[ 5] 4.00-5.00 sec 379 KBytes 3.11 Mbits/sec 0 93.5 KBytes \n[ 5] 5.00-6.00 sec 442 KBytes 3.62 Mbits/sec 0 124 KBytes \n[ 5] 6.00-7.00 sec 442 KBytes 3.62 Mbits/sec 0 176 KBytes \n[ 5] 7.00-8.00 sec 569 KBytes 4.66 Mbits/sec 0 249 KBytes \n[ 5] 8.00-9.00 sec 695 KBytes 5.69 Mbits/sec 0 333 KBytes \n[ 5] 9.00-10.00 sec 442 KBytes 3.62 Mbits/sec 0 433 KBytes \n- - - - - - - - - - - - - - - - - - - - - - - - -\n[ ID] Interval Transfer Bitrate Retr\n[ 5] 0.00-10.00 sec 4.06 MBytes 3.40 Mbits/sec 0 sender\n[ 5] 0.00-11.14 sec 3.48 MBytes 2.62 Mbits/sec receiver\n\niperf Done.\n")
plot_double_iperf(
filename="/home/madrigal/Documents/code/beach_apr4/collection_1743777162.json"
)
plot_rsrp(
filename="/home/madrigal/Documents/code/beach_apr4/collection_1743777162.json"
)
plot_rsrq(
filename="/home/madrigal/Documents/code/beach_apr4/collection_1743777162.json"
)
# plot_double_iperf(filename="/home/madrigal/Documents/code/beach_mar_7/collection_whip_antennas.json")
# plot_iperf(filename='/home/madrigal/Documents/code/collections_beach_jan_29/collection_1738178064.json')
# plot_bytes(filename="/home/madrigal/Documents/code/collections_beach_jan_29/collection_1738178064.json")
# plot_manual()

63
sierra_commands.py Normal file
View File

@ -0,0 +1,63 @@
import serial
from helper_functions import extract_numbers
# Fetch operational status
def get_modem_status(port="/dev/ttyUSB0", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT!GSTATUS?\r")
response = ser.readlines()
ser.close()
full_decoded = ""
for item in response:
try:
decoded_item = item.decode("utf-8")
full_decoded = full_decoded + "\n" + decoded_item
except Exception as e:
dictionary["status encoded"] = str(item)
print(f"Could not decode GSTATUS item:\t{str(item)}\n{e}")
dictionary["Status"] = full_decoded
return dictionary
except Exception as e:
return {"GSTATUS error": f"{e}"}
# Fetch NR (5G) information of the device
def get_modem_nr_info(port="/dev/ttyUSB0", baudrate=115200, dictionary={}):
try:
ser = serial.Serial(port, baudrate, timeout=1)
ser.write(b"AT!NRINFO?\r")
response = ser.readlines()
ser.close()
full_decoded = ""
for item in response:
try:
decoded_item = item.decode("utf-8")
full_decoded = full_decoded + "\n" + decoded_item
if "NR5G RSRP (dBm):" in decoded_item:
numbers = extract_numbers(decoded_item)
if len(numbers) >= 4:
dictionary["RSRP"] = numbers[1]
dictionary["RSRQ"] = numbers[3]
elif "NR5G SINR (dB):" in decoded_item:
numbers = extract_numbers(decoded_item)
if len(numbers) >= 2:
dictionary["SINR"] = numbers[1]
except Exception as e:
dictionary["NR info encoded"] = str(item)
print(f"Could not decode NRINFO item:\t{str(item)}\n{e}")
dictionary["NR Info"] = full_decoded
return dictionary
except Exception as e:
return {"NRINFO error": f"{e}"}