range-testing/processing/post_process.py

194 lines
6.2 KiB
Python
Raw Permalink Normal View History

import json
import re
from functions.helper_functions import calculate_distance
def get_data_lists(data, entry_type, default_diconnect):
distances = []
prx = []
drx = []
rx2 = []
rx3 = []
for entry in data:
try:
int(float(entry["distance"]))
prx.append(
default_diconnect
if int(entry[f"{entry_type} PRX"].strip()) == -32768
else int(entry.get(f"{entry_type} PRX", default_diconnect))
)
drx.append(
default_diconnect
if int(entry[f"{entry_type} DRX"].strip()) == -32768
else int(entry.get(f"{entry_type} DRX", default_diconnect))
)
rx2.append(
default_diconnect
if int(entry[f"{entry_type} RX2"].strip()) == -32768
else int(entry.get(f"{entry_type} RX2", default_diconnect))
)
rx3.append(
default_diconnect
if int(entry[f"{entry_type} RX3"].strip()) == -32768
else int(entry.get(f"{entry_type} RX3", default_diconnect))
)
distances.append(int(float(entry["distance"])))
except (ValueError, KeyError):
continue
return distances, prx, drx, rx2, rx3
def get_avg_list(data, entry_type, default_disconnect):
distances = []
avg_list = []
for entry in data:
try:
int(float(entry["distance"]))
antennas = []
antennas.append(
default_disconnect
if int(entry[f"{entry_type} PRX"].strip()) == -32768
else int(entry.get(f"{entry_type} PRX", default_disconnect))
)
antennas.append(
default_disconnect
if int(entry[f"{entry_type} DRX"].strip()) == -32768
else int(entry.get(f"{entry_type} DRX", default_disconnect))
)
antennas.append(
default_disconnect
if int(entry[f"{entry_type} RX2"].strip()) == -32768
else int(entry.get(f"{entry_type} RX2", default_disconnect))
)
antennas.append(
default_disconnect
if int(entry[f"{entry_type} RX3"].strip()) == -32768
else int(entry.get(f"{entry_type} RX3", default_disconnect))
)
antennas.remove(max(antennas))
antennas.remove(min(antennas))
if (
min(antennas) == default_disconnect
and max(antennas) != default_disconnect
):
avg_rsrp = max(antennas)
else:
avg_rsrp = sum(antennas) / len(antennas)
avg_list.append(avg_rsrp)
distances.append(int(float(entry["distance"])))
except (ValueError, KeyError):
continue
return distances, avg_list
def get_iperf_lists(data, ip_address):
distances = []
sender = []
receiver = []
reverse_distances = []
reverse_sender = []
reverse_receiver = []
for entry in data:
if (
"iperf_full" in entry
and entry["start_distance"] != "Unknown"
and ip_address in entry["iperf_full"]
):
if "Reverse mode" in entry["iperf_full"]:
try:
reverse_sender.append(float(entry["sender_bitrate"]))
reverse_receiver.append(float(entry["receiver_bitrate"]))
reverse_distances.append(int(float(entry["start_distance"])))
M
2025-10-16 12:14:10 -04:00
except Exception:
message = entry["iperf_full"]
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
reverse_sender.append(float(bitrates[-2]))
reverse_receiver.append(float(bitrates[-1]))
reverse_distances.append(int(float(entry["start_distance"])))
else:
try:
sender.append(float(entry["sender_bitrate"]))
receiver.append(float(entry["receiver_bitrate"]))
distances.append(int(float(entry["start_distance"])))
M
2025-10-16 12:14:10 -04:00
except Exception:
message = entry["iperf_full"]
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
sender.append(float(bitrates[-2]))
receiver.append(float(bitrates[-1]))
distances.append(int(float(entry["start_distance"])))
return (
distances,
reverse_distances,
sender,
reverse_sender,
receiver,
reverse_receiver,
)
def fix_long(bad_long):
"""
Fix longitude values incorrectly parsed from NMEA (leading zero dropped degrees).
Assumes all values should be around -79.x based on recording location.
Removes the spurious leading digit from minutes.
"""
broken_degrees = 7
# Recover "minutes" from the broken decimal
extended_minutes = (abs(bad_long) - broken_degrees) * 60
# Remove the extra leading digit (e.g. 920 -> 20)
minutes = extended_minutes % 100
corrected_degrees = 79
decimal = corrected_degrees + minutes / 60
if bad_long < 0:
decimal = -decimal
return decimal
def add_distance_after(filename: str, base_location: dict):
try:
with open(filename, "r") as file:
data = json.load(file)
except FileNotFoundError:
print(f"No file by that name: {filename}")
return None
distance = "Unknown"
for dictionary in data:
if "latitude" in dictionary and type(dictionary["latitude"]) == float:
distance = calculate_distance(base_location, dictionary)
dictionary["distance"] = distance
elif "iperf_full" in dictionary:
dictionary["start_distance"] = distance
# Save updated data back to the file
with open(filename, "w") as file:
json.dump(data, file, indent=4)
if __name__ == "__main__":
filenames = [
"",
]
base_location = {
"latitude": 43.656328,
"longitude": -79.307884,
"altitude": 80,
}
for filename in filenames:
add_distance_after(filename=filename, base_location=base_location)