import re import socket from helper_functions import normalize, save_data_to_json RELAY_IP = "10.45.0.1" # gNodeB IP of relay PORT = 5005 BUFFER_SIZE = 4096 def parse_iperf_output(output, test_type): matches = re.findall( r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+(Kbits/sec|Mbits/sec)", output, ) if len(matches) >= 1: # Normalize the last entries last_value, last_unit = matches[-1] sender_bitrate = normalize(float(last_value), last_unit) # Try to differentiate sender vs receiver receiver_match = re.search(r"receiver", output, re.IGNORECASE) sender_match = re.search(r"sender", output, re.IGNORECASE) if receiver_match and sender_match and len(matches) >= 2: recv_value, recv_unit = matches[-1] send_value, send_unit = matches[-2] receiver_bitrate = normalize(float(recv_value), recv_unit) sender_bitrate = normalize(float(send_value), send_unit) else: receiver_bitrate = sender_bitrate else: bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output) sender_bitrate = float(bitrates[-2]) receiver_bitrate = float(bitrates[-1]) print(f"\nrelay -> ground {test_type} IPERF complete") print(f"IPERF sender bitrate: {sender_bitrate}") print(f"IPERF receiver bitrate: {receiver_bitrate}") data = { "iperf_full": output, "sender_bitrate": sender_bitrate, "receiver_bitrate": receiver_bitrate, "type": test_type, "stations": "rg", } return data def collect_iperf_remote(filename: str = ""): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(20) # timeout for receiving results # Send "start" command sock.sendto(b"start", (RELAY_IP, PORT)) print(f"[End] Sent start command to relay {RELAY_IP}:{PORT}") # Collect results results = [] try: while True: data, _ = sock.recvfrom(BUFFER_SIZE) if not data: break results.append(data.decode()) except socket.timeout: pass full_output = "".join(results) # Split output into normal vs reverse normal_section = "" reverse_section = "" if "=== Reverse Test" in full_output: parts = full_output.split("=== Reverse Test (Ground → Relay) ===") normal_section = ( parts[0].replace("=== Normal Test (Relay → Ground) ===", "").strip() ) reverse_section = parts[1].strip() if len(parts) > 1 else "" else: normal_section = full_output.strip() uplink_data = parse_iperf_output(output=normal_section, test_type="uplink") downlink_data = parse_iperf_output(output=reverse_section, test_type="downlink") if filename: save_data_to_json(data=uplink_data, filename=filename) save_data_to_json(data=downlink_data, filename=filename) if __name__ == "__main__": collect_iperf_remote()