range-testing/iperf/iperf_functions.py

129 lines
3.9 KiB
Python
Raw Permalink Normal View History

import re
import subprocess
import time
from functions.helper_functions import calculate_distance, normalize, save_data_to_json
from functions.location import read_gps_data
def get_location(base_location):
try:
location = read_gps_data()
if base_location:
start_distance = calculate_distance(base_location, location)
else:
start_distance = None
except Exception as e:
print(f"Could not collect location: {e}")
start_distance = None
return location, start_distance
def parse_iperf_output(output):
if output == "":
return None, None
matches = re.findall(
r"\[\s*\d+\]\s+\d+\.\d+\-\d+\.\d+\s+sec\s+[\d.]+\s+\w+Bytes\s+([\d.]+)\s+(Kbits/sec|Mbits/sec)",
output,
)
if len(matches) >= 1:
# Normalize the last entries
last_value, last_unit = matches[-1]
sender_bitrate = normalize(float(last_value), last_unit)
# Try to differentiate sender vs receiver
receiver_match = re.search(r"receiver", output, re.IGNORECASE)
sender_match = re.search(r"sender", output, re.IGNORECASE)
if receiver_match and sender_match and len(matches) >= 2:
recv_value, recv_unit = matches[-1]
send_value, send_unit = matches[-2]
receiver_bitrate = normalize(float(recv_value), recv_unit)
sender_bitrate = normalize(float(send_value), send_unit)
else:
receiver_bitrate = sender_bitrate
else:
bitrates = re.findall(r"(\d+\.\d+) Mbits/sec", output)
sender_bitrate = float(bitrates[-2])
receiver_bitrate = float(bitrates[-1])
return sender_bitrate, receiver_bitrate
# Run iperf test
def collect_iperf(
filename,
is_client=True,
server_ip="10.45.0.1",
stations="",
duration=10,
timeout=20,
base_location=None,
):
if is_client:
commands = [
["iperf3", "-c", server_ip, "-t", str(duration)],
["iperf3", "-c", server_ip, "-t", str(duration), "-R"],
]
else:
commands = [["iperf3", "-s"]]
for command in commands:
try:
location, start_distance = get_location(base_location=base_location)
# Run iperf with timeout protection
try:
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout,
)
output = result.stdout
# Check for errors
if result.returncode != 0:
print(f"Error running iperf3: {result.stderr}")
output = ""
M
2025-10-17 09:55:44 -04:00
error = result.stderr
except subprocess.TimeoutExpired:
print(f"iPerf test timed out after {timeout} seconds.")
output = ""
M
2025-10-17 09:55:44 -04:00
error = f"iPerf test timed out after {timeout} seconds."
sender_bitrate, receiver_bitrate = parse_iperf_output(output)
if "-R" in command:
test_type = "downlink"
else:
test_type = "uplink"
data = {
"iperf_full": output,
"sender_bitrate": sender_bitrate,
"receiver_bitrate": receiver_bitrate,
"start_distance": start_distance,
"location": location,
"type": test_type,
"stations": stations,
}
if output == "":
M
2025-10-17 09:55:44 -04:00
data["error"] = error
else:
print(f"\n{server_ip} {test_type} IPERF complete")
print(f"IPERF sender bitrate: {sender_bitrate}")
print(f"IPERF receiver bitrate: {receiver_bitrate}")
save_data_to_json(data=data, filename=filename)
time.sleep(0.25)
except Exception as e:
print(f"iPerf Error: {e}")