194 lines
6.2 KiB
Python
194 lines
6.2 KiB
Python
import json
|
|
import re
|
|
|
|
from helper_functions import calculate_distance
|
|
|
|
|
|
def get_data_lists(data, entry_type, default_diconnect):
|
|
distances = []
|
|
prx = []
|
|
drx = []
|
|
rx2 = []
|
|
rx3 = []
|
|
|
|
for entry in data:
|
|
try:
|
|
int(float(entry["distance"]))
|
|
prx.append(
|
|
default_diconnect
|
|
if int(entry[f"{entry_type} PRX"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} PRX", default_diconnect))
|
|
)
|
|
drx.append(
|
|
default_diconnect
|
|
if int(entry[f"{entry_type} DRX"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} DRX", default_diconnect))
|
|
)
|
|
rx2.append(
|
|
default_diconnect
|
|
if int(entry[f"{entry_type} RX2"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} RX2", default_diconnect))
|
|
)
|
|
rx3.append(
|
|
default_diconnect
|
|
if int(entry[f"{entry_type} RX3"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} RX3", default_diconnect))
|
|
)
|
|
distances.append(int(float(entry["distance"])))
|
|
except (ValueError, KeyError):
|
|
continue
|
|
|
|
return distances, prx, drx, rx2, rx3
|
|
|
|
|
|
def get_avg_list(data, entry_type, default_disconnect):
|
|
distances = []
|
|
avg_list = []
|
|
|
|
for entry in data:
|
|
try:
|
|
int(float(entry["distance"]))
|
|
antennas = []
|
|
antennas.append(
|
|
default_disconnect
|
|
if int(entry[f"{entry_type} PRX"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} PRX", default_disconnect))
|
|
)
|
|
antennas.append(
|
|
default_disconnect
|
|
if int(entry[f"{entry_type} DRX"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} DRX", default_disconnect))
|
|
)
|
|
antennas.append(
|
|
default_disconnect
|
|
if int(entry[f"{entry_type} RX2"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} RX2", default_disconnect))
|
|
)
|
|
antennas.append(
|
|
default_disconnect
|
|
if int(entry[f"{entry_type} RX3"].strip()) == -32768
|
|
else int(entry.get(f"{entry_type} RX3", default_disconnect))
|
|
)
|
|
antennas.remove(max(antennas))
|
|
antennas.remove(min(antennas))
|
|
if (
|
|
min(antennas) == default_disconnect
|
|
and max(antennas) != default_disconnect
|
|
):
|
|
avg_rsrp = max(antennas)
|
|
else:
|
|
avg_rsrp = sum(antennas) / len(antennas)
|
|
avg_list.append(avg_rsrp)
|
|
distances.append(int(float(entry["distance"])))
|
|
except (ValueError, KeyError):
|
|
continue
|
|
|
|
return distances, avg_list
|
|
|
|
|
|
def get_iperf_lists(data, ip_address):
|
|
distances = []
|
|
sender = []
|
|
receiver = []
|
|
reverse_distances = []
|
|
reverse_sender = []
|
|
reverse_receiver = []
|
|
|
|
for entry in data:
|
|
if (
|
|
"iperf_full" in entry
|
|
and entry["start_distance"] != "Unknown"
|
|
and ip_address in entry["iperf_full"]
|
|
):
|
|
if "Reverse mode" in entry["iperf_full"]:
|
|
try:
|
|
reverse_sender.append(float(entry["sender_bitrate"]))
|
|
reverse_receiver.append(float(entry["receiver_bitrate"]))
|
|
reverse_distances.append(int(float(entry["start_distance"])))
|
|
except:
|
|
message = entry["iperf_full"]
|
|
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
|
|
|
|
reverse_sender.append(float(bitrates[-2]))
|
|
reverse_receiver.append(float(bitrates[-1]))
|
|
reverse_distances.append(int(float(entry["start_distance"])))
|
|
else:
|
|
try:
|
|
sender.append(float(entry["sender_bitrate"]))
|
|
receiver.append(float(entry["receiver_bitrate"]))
|
|
distances.append(int(float(entry["start_distance"])))
|
|
except:
|
|
message = entry["iperf_full"]
|
|
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message)
|
|
|
|
sender.append(float(bitrates[-2]))
|
|
receiver.append(float(bitrates[-1]))
|
|
distances.append(int(float(entry["start_distance"])))
|
|
|
|
return (
|
|
distances,
|
|
reverse_distances,
|
|
sender,
|
|
reverse_sender,
|
|
receiver,
|
|
reverse_receiver,
|
|
)
|
|
|
|
|
|
def fix_long(bad_long):
|
|
"""
|
|
Fix longitude values incorrectly parsed from NMEA (leading zero dropped degrees).
|
|
|
|
Assumes all values should be around -79.x based on recording location.
|
|
Removes the spurious leading digit from minutes.
|
|
"""
|
|
broken_degrees = 7
|
|
# Recover "minutes" from the broken decimal
|
|
extended_minutes = (abs(bad_long) - broken_degrees) * 60
|
|
|
|
# Remove the extra leading digit (e.g. 920 -> 20)
|
|
minutes = extended_minutes % 100
|
|
|
|
corrected_degrees = 79
|
|
decimal = corrected_degrees + minutes / 60
|
|
|
|
if bad_long < 0:
|
|
decimal = -decimal
|
|
|
|
return decimal
|
|
|
|
|
|
def add_distance_after(filename: str, base_location: dict):
|
|
try:
|
|
with open(filename, "r") as file:
|
|
data = json.load(file)
|
|
except FileNotFoundError:
|
|
print(f"No file by that name: {filename}")
|
|
return None
|
|
|
|
distance = "Unknown"
|
|
|
|
for dictionary in data:
|
|
if "latitude" in dictionary and type(dictionary["latitude"]) == float:
|
|
distance = calculate_distance(base_location, dictionary)
|
|
dictionary["distance"] = distance
|
|
elif "iperf_full" in dictionary:
|
|
dictionary["start_distance"] = distance
|
|
|
|
# Save updated data back to the file
|
|
with open(filename, "w") as file:
|
|
json.dump(data, file, indent=4)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
filenames = [
|
|
"",
|
|
]
|
|
base_location = {
|
|
"latitude": 43.656328,
|
|
"longitude": -79.307884,
|
|
"altitude": 80,
|
|
}
|
|
for filename in filenames:
|
|
add_distance_after(filename=filename, base_location=base_location)
|