range-testing/communication.py

181 lines
6.0 KiB
Python
Raw Normal View History

M
2025-09-10 10:50:07 -04:00
import os
import re
import subprocess
import threading
import time
from functions.at_commands import (
M
2025-09-10 10:55:05 -04:00
get_modem_cops,
get_modem_cpol,
get_modem_creg,
get_modem_csq,
get_modem_qnwcfg,
get_modem_qnwinfo,
get_modem_qspn,
get_modem_rsrp,
get_modem_rsrq,
get_modem_sinr,
set_configs,
M
2025-09-10 10:55:05 -04:00
)
from functions.helper_functions import save_data_to_json
from functions.location import get_current_location, set_base_location
from functions.sierra_commands import get_modem_nr_info, get_modem_status
from iperf.iperf_functions import collect_iperf
class Communication:
def __init__(self):
self.running = False
self.base_location = None
set_modem = (
input(
"Do you want limit the modem to NR5G without roaming? (Not recommended if PLMN is not 001) (y/n )"
)
.strip()
.lower()
)
while set_modem not in ("y", "yes", "n", "no"):
set_modem = input("Your response must be y or n ").strip().lower()
if set_modem in ("y", "yes"):
print("Setting configs...")
set_configs()
self.ip_address = (
input("Enter the ip address (Default is 10.45.0.1): ").strip().lower()
or "10.45.0.1"
)
self.folder_name = (
input("Enter the log folder name (Default is oct_range_test): ")
.strip()
.lower()
or "oct_range_test"
)
self.interval = int(
input(
"Enter the time interval (s) in between pings/modem data collection (Default is 5): "
)
.strip()
.lower()
or 5
)
self.iperf_duration = int(
input("Enter the iperf test duration (s) (Default is 5): ").strip().lower()
or 5
)
foldername = f"data/{self.folder_name}"
self.filename = foldername + "/test_" + str(int(time.time())) + ".json"
os.makedirs(foldername, exist_ok=True)
def set_location(self):
self.base_location = set_base_location()
save_data_to_json(data=self.base_location, filename=self.filename)
def run_iperf(self):
collect_iperf(
filename=self.filename,
is_client=True,
server_ip=self.ip_address,
duration=self.iperf_duration,
timeout=(self.iperf_duration + 5),
base_location=self.base_location,
)
# Collect and send data continuously from Quectel
def collect_data(self):
while self.running:
data = {}
data = get_current_location(dictionary=data)
data = get_modem_cops(dictionary=data)
data = get_modem_creg(dictionary=data)
data = get_modem_csq(dictionary=data)
data = get_modem_rsrp(dictionary=data)
data = get_modem_rsrq(dictionary=data)
data = get_modem_sinr(dictionary=data)
data = get_modem_cpol(dictionary=data)
data = get_modem_qnwcfg(dictionary=data)
data = get_modem_qnwinfo(dictionary=data)
data = get_modem_qspn(dictionary=data)
data = ping_basestation(dictionary=data, address=self.ip_address)
data["timestamp"] = time.time()
# Send to server
print(f"\nDistance: {data.get('distance')}")
print(f"Service: {data.get('network information')}")
print(f"Ping Time: {data.get(f'{self.ip_address}_ping_time')}")
print(
f"RSRP: {data.get('RSRP PRX')} {data.get('RSRP DRX')} "
f"{data.get('RSRP RX2')} {data.get('RSRP RX3')}"
)
save_data_to_json(data=data, filename=self.filename)
time.sleep(self.interval)
# Collect and send data continuously from Sierra
def collect_sierra_data(self):
while self.running:
data = {}
data = get_current_location(dictionary=data)
data = get_modem_nr_info(dictionary=data)
data = get_modem_status(dictionary=data)
data = ping_basestation(dictionary=data, address=self.ip_address)
data["timestamp"] = time.time()
# Send to server
print(f"\nDistance: {data.get('distance')}")
print(f"Ping Time: {data.get('ping_time')}")
print(f"RSRP: {data.get('RSRP')}")
save_data_to_json(data=data, filename=self.filename)
time.sleep(self.interval)
# Ping base station
M
2025-10-16 12:14:10 -04:00
def ping_basestation(address="10.45.0.1", name="10.45.0.1", dictionary={}) -> dict:
try:
result = subprocess.run(
["ping", "-c", "1", address], capture_output=True, text=True
)
match = re.search(r"time=([\d.]+)", result.stdout)
if match:
time_value = match.group(1)
M
2025-10-16 12:14:10 -04:00
dictionary[f"{name}_ping_time"] = time_value
else:
M
2025-10-16 12:14:10 -04:00
dictionary[f"{name}_ping_time"] = "Not found"
M
2025-09-19 10:24:01 -04:00
M
2025-10-16 12:14:10 -04:00
dictionary[f"{name}_ping_stats"] = result.stdout
M
2025-09-19 10:24:01 -04:00
except Exception as e:
M
2025-10-16 12:14:10 -04:00
dictionary[f"{name}_ping error"] = f"{e}"
M
2025-09-19 10:24:01 -04:00
return dictionary
if __name__ == "__main__":
modem = Communication()
print(
"Type 'l' to set basestation location, 'b' to begin, "
"'s' to stop, 'i' to run an iperf test, or 'x' to exit:"
)
while True:
command = input("> ").strip().lower()
if command == "b" and not modem.running:
print("Starting data collection...")
modem.running = True
threading.Thread(target=modem.collect_data).start()
elif command == "l":
modem.set_location()
elif command == "i":
threading.Thread(target=modem.run_iperf).start()
elif command == "s" and modem.running:
print("Stopping data collection...")
modem.running = False
elif command == "x":
modem.running = False
break
elif command not in ["l", "b", "s", "i", "x"]:
print("Invalid command. Type 'l', 'b', 's', 'i', or 'x'.")