import json import re from helper_functions import calculate_distance def get_data_lists(data, entry_type, default_diconnect): distances = [] prx = [] drx = [] rx2 = [] rx3 = [] for entry in data: try: int(float(entry["distance"])) prx.append( default_diconnect if int(entry[f"{entry_type} PRX"].strip()) == -32768 else int(entry.get(f"{entry_type} PRX", default_diconnect)) ) drx.append( default_diconnect if int(entry[f"{entry_type} DRX"].strip()) == -32768 else int(entry.get(f"{entry_type} DRX", default_diconnect)) ) rx2.append( default_diconnect if int(entry[f"{entry_type} RX2"].strip()) == -32768 else int(entry.get(f"{entry_type} RX2", default_diconnect)) ) rx3.append( default_diconnect if int(entry[f"{entry_type} RX3"].strip()) == -32768 else int(entry.get(f"{entry_type} RX3", default_diconnect)) ) distances.append(int(float(entry["distance"]))) except (ValueError, KeyError): continue return distances, prx, drx, rx2, rx3 def get_avg_list(data, entry_type, default_disconnect): distances = [] avg_list = [] for entry in data: try: int(float(entry["distance"])) antennas = [] antennas.append( default_disconnect if int(entry[f"{entry_type} PRX"].strip()) == -32768 else int(entry.get(f"{entry_type} PRX", default_disconnect)) ) antennas.append( default_disconnect if int(entry[f"{entry_type} DRX"].strip()) == -32768 else int(entry.get(f"{entry_type} DRX", default_disconnect)) ) antennas.append( default_disconnect if int(entry[f"{entry_type} RX2"].strip()) == -32768 else int(entry.get(f"{entry_type} RX2", default_disconnect)) ) antennas.append( default_disconnect if int(entry[f"{entry_type} RX3"].strip()) == -32768 else int(entry.get(f"{entry_type} RX3", default_disconnect)) ) antennas.remove(max(antennas)) antennas.remove(min(antennas)) if ( min(antennas) == default_disconnect and max(antennas) != default_disconnect ): avg_rsrp = max(antennas) else: avg_rsrp = sum(antennas) / len(antennas) avg_list.append(avg_rsrp) distances.append(int(float(entry["distance"]))) except (ValueError, KeyError): continue return distances, avg_list def get_iperf_lists(data, ip_address): distances = [] sender = [] receiver = [] reverse_distances = [] reverse_sender = [] reverse_receiver = [] for entry in data: if ( "iperf_full" in entry and entry["start_distance"] != "Unknown" and ip_address in entry["iperf_full"] ): if "Reverse mode" in entry["iperf_full"]: try: reverse_sender.append(float(entry["sender_bitrate"])) reverse_receiver.append(float(entry["receiver_bitrate"])) reverse_distances.append(int(float(entry["start_distance"]))) except Exception: message = entry["iperf_full"] bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message) reverse_sender.append(float(bitrates[-2])) reverse_receiver.append(float(bitrates[-1])) reverse_distances.append(int(float(entry["start_distance"]))) else: try: sender.append(float(entry["sender_bitrate"])) receiver.append(float(entry["receiver_bitrate"])) distances.append(int(float(entry["start_distance"]))) except Exception: message = entry["iperf_full"] bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", message) sender.append(float(bitrates[-2])) receiver.append(float(bitrates[-1])) distances.append(int(float(entry["start_distance"]))) return ( distances, reverse_distances, sender, reverse_sender, receiver, reverse_receiver, ) def fix_long(bad_long): """ Fix longitude values incorrectly parsed from NMEA (leading zero dropped degrees). Assumes all values should be around -79.x based on recording location. Removes the spurious leading digit from minutes. """ broken_degrees = 7 # Recover "minutes" from the broken decimal extended_minutes = (abs(bad_long) - broken_degrees) * 60 # Remove the extra leading digit (e.g. 920 -> 20) minutes = extended_minutes % 100 corrected_degrees = 79 decimal = corrected_degrees + minutes / 60 if bad_long < 0: decimal = -decimal return decimal def add_distance_after(filename: str, base_location: dict): try: with open(filename, "r") as file: data = json.load(file) except FileNotFoundError: print(f"No file by that name: {filename}") return None distance = "Unknown" for dictionary in data: if "latitude" in dictionary and type(dictionary["latitude"]) == float: distance = calculate_distance(base_location, dictionary) dictionary["distance"] = distance elif "iperf_full" in dictionary: dictionary["start_distance"] = distance # Save updated data back to the file with open(filename, "w") as file: json.dump(data, file, indent=4) if __name__ == "__main__": filenames = [ "", ] base_location = { "latitude": 43.656328, "longitude": -79.307884, "altitude": 80, } for filename in filenames: add_distance_after(filename=filename, base_location=base_location)