369 lines
13 KiB
Python
369 lines
13 KiB
Python
import serial
|
|
|
|
from helper_functions import comma_split, extract_numbers
|
|
|
|
# For more info on the commands:
|
|
# https://files.waveshare.com/upload/8/8a/Quectel_RG520N%26RG52xF%26RG530F%26RM520N%26RM530N_Series_AT_Commands_Manual_V1.0.0_Preliminary_20220812.pdf
|
|
|
|
|
|
# Configure connection/5G parameters
|
|
def set_configs(port="/dev/ttyUSB2", baudrate=115200):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b'AT+QNWPREFCFG="nr5g_band",78\r')
|
|
response = ser.readlines()
|
|
print(response)
|
|
ser.write(b'AT+QNWPREFCFG="mode_pref",NR5G\r')
|
|
response = ser.readlines()
|
|
print(response)
|
|
ser.write(b'AT+QNWPREFCFG="roam_pref",1\r')
|
|
response = ser.readlines()
|
|
print(response)
|
|
ser.close()
|
|
print("Finished setting configs.")
|
|
except Exception as e:
|
|
return print(f"Configuration error: {e}")
|
|
|
|
|
|
# Fetch operator status (5.1)
|
|
def get_modem_cops(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+COPS?\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "COPS:" in decoded_item:
|
|
nums = extract_numbers(decoded_item)
|
|
dictionary["mode"] = nums[0]
|
|
return dictionary
|
|
elif "CME" in decoded_item:
|
|
dictionary["cops"] = decoded_item
|
|
except Exception as e:
|
|
dictionary["cops encoded"] = str(item)
|
|
print(f"Could not decode COPS item:\t{str(item)}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"COPS error": f"{e}"}
|
|
|
|
|
|
# Fetch network registration status (5.2)
|
|
def get_modem_creg(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+CREG?\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "CREG:" in decoded_item:
|
|
nums = extract_numbers(decoded_item)
|
|
dictionary["network registration result code"] = nums[0]
|
|
dictionary["creg status"] = nums[1]
|
|
return dictionary
|
|
elif "CME" in decoded_item:
|
|
dictionary["creg error"] = decoded_item
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["creg encoded"] = decoded_item
|
|
print(f"Could not decode CREG item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"CREG error": f"{e}"}
|
|
|
|
|
|
# Fetch modem csq stats (5.9)
|
|
def get_modem_csq(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+CSQ\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "CSQ:" in decoded_item:
|
|
nums = extract_numbers(decoded_item)
|
|
dictionary["RSSI"] = nums[0]
|
|
dictionary["BER"] = nums[1]
|
|
return dictionary
|
|
|
|
elif "CME" in decoded_item:
|
|
dictionary["csq error"] = decoded_item
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["csq encoded"] = decoded_item
|
|
print(f"Could not decode CSQ item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"CSQ error": f"{e}"}
|
|
|
|
|
|
# Fetch RSRP (5.10)
|
|
def get_modem_rsrp(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+QRSRP\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "QRSRP:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
decoded_list = comma_split(data)
|
|
|
|
if type(decoded_list) == list:
|
|
dictionary["RSRP PRX"] = decoded_list[0]
|
|
dictionary["RSRP DRX"] = decoded_list[1]
|
|
dictionary["RSRP RX2"] = decoded_list[2]
|
|
dictionary["RSRP RX3"] = decoded_list[3]
|
|
dictionary["RSRP sysmode"] = decoded_list[4]
|
|
else:
|
|
dictionary["RSRP"] = decoded_list
|
|
|
|
return dictionary
|
|
|
|
elif "CME" in decoded_item:
|
|
dictionary["qrsrp error"] = decoded_item
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["qrsrp encoded"] = decoded_item
|
|
print(f"Could not decode QRSRP item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"QRSRP error": f"{e}"}
|
|
|
|
|
|
# Fetch RSRQ (5.11)
|
|
def get_modem_rsrq(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+QRSRQ\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "QRSRQ:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
decoded_list = comma_split(data)
|
|
|
|
if type(decoded_list) == list:
|
|
dictionary["RSRQ PRX"] = decoded_list[0]
|
|
dictionary["RSRQ DRX"] = decoded_list[1]
|
|
dictionary["RSRQ RX2"] = decoded_list[2]
|
|
dictionary["RSRQ RX3"] = decoded_list[3]
|
|
dictionary["RSRQ sysmode"] = decoded_list[4]
|
|
else:
|
|
dictionary["RSRQ"] = decoded_list
|
|
return dictionary
|
|
|
|
elif "CME" in decoded_item:
|
|
dictionary["qrsrq error"] = decoded_item
|
|
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["qrsrq encoded"] = decoded_item
|
|
print(f"Could not decode QRSRQ item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"QRSRQ error": f"{e}"}
|
|
|
|
|
|
# Fetch SINR of the current service network (5.12)
|
|
def get_modem_sinr(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+QSINR?\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "QSINR:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
decoded_list = comma_split(data)
|
|
|
|
if type(decoded_list) == list:
|
|
dictionary["SINR PRX"] = decoded_list[0]
|
|
dictionary["SINR DRX"] = decoded_list[1]
|
|
dictionary["SINR RX2"] = decoded_list[2]
|
|
dictionary["SINR RX3"] = decoded_list[3]
|
|
dictionary["SINR sysmode"] = decoded_list[4]
|
|
else:
|
|
decoded_item = decoded_list
|
|
return dictionary
|
|
|
|
elif "CME" in decoded_item:
|
|
dictionary["qsinr error"] = decoded_item
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["qsinr encoded"] = decoded_item
|
|
print(f"Could not decode QSINR item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"QSINR error": f"{e}"}
|
|
|
|
|
|
# Fetch the list of preferred operators (5.13)
|
|
def get_modem_cpol(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+CPOL?\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "CPOL:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
decoded_list = comma_split(data)
|
|
|
|
if type(decoded_list) == list:
|
|
dictionary["index"] = decoded_list[0]
|
|
dictionary["format"] = decoded_list[1]
|
|
dictionary["oper"] = decoded_list[2]
|
|
dictionary["GSM"] = decoded_list[3]
|
|
dictionary["GSM compact"] = decoded_list[4]
|
|
dictionary["UTRAN"] = decoded_list[5]
|
|
dictionary["E-UTRAN"] = decoded_list[6]
|
|
dictionary["NG-RAN"] = decoded_list[7]
|
|
else:
|
|
dictionary["cpol"] = decoded_list
|
|
return dictionary
|
|
|
|
elif "CME" in decoded_item:
|
|
dictionary["cpol error"] = decoded_item
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["cpol encoded"] = decoded_item
|
|
print(f"Could not decode CPOL item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"CPOL error": f"{e}"}
|
|
|
|
|
|
# Fetch network parameters (5.21.4)
|
|
def get_modem_qnwcfg(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b'AT+QNWCFG="up/down"\r')
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "QNWCFG:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
decoded_list = comma_split(data)
|
|
|
|
if type(decoded_list) == list:
|
|
dictionary["up/down"] = decoded_list[0]
|
|
dictionary["uplink (bytes/s)"] = decoded_list[1]
|
|
dictionary["downlink (bytes/s)"] = decoded_list[2]
|
|
dictionary["time interval"] = decoded_list[3]
|
|
else:
|
|
dictionary["qnwcfg"] = decoded_list
|
|
|
|
return dictionary
|
|
|
|
elif "CME" in decoded_item:
|
|
dictionary["qnwcfg"] = decoded_item
|
|
except Exception as e:
|
|
decoded_item = str(item)
|
|
dictionary["qnwcfg encoded"] = decoded_item
|
|
print(f"Could not decode QNWCFG item:\t{decoded_item}\n{e}")
|
|
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"QNWCFG error": f"{e}"}
|
|
|
|
|
|
# Fetch network information (5.18)
|
|
def get_modem_qnwinfo(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+QNWINFO\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "QNWINFO:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
dictionary["network information"] = data
|
|
return dictionary
|
|
elif "CME" in decoded_item:
|
|
dictionary["network information"] = decoded_item
|
|
except Exception as e:
|
|
dictionary["qnwinfo encoded"] = str(item)
|
|
print(f"Could not decode QNWINFO item:\t{str(item)}\n{e}")
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"QNWINFO error": f"{e}"}
|
|
|
|
|
|
# Fetch servie provider name (5.19)
|
|
def get_modem_qspn(port="/dev/ttyUSB2", baudrate=115200, dictionary={}):
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=1)
|
|
ser.write(b"AT+QSPN\r")
|
|
response = ser.readlines()
|
|
ser.close()
|
|
|
|
for item in response:
|
|
try:
|
|
decoded_item = item.decode("utf-8")
|
|
|
|
if "QSPN:" in decoded_item:
|
|
_, data = decoded_item.split(":")
|
|
dictionary["service provider"] = data
|
|
return dictionary
|
|
elif "CME" in decoded_item:
|
|
dictionary["service provider"] = decoded_item
|
|
except Exception as e:
|
|
dictionary["qspn encoded"] = str(item)
|
|
print(f"Could not decode QSPN item:\t{str(item)}\n{e}")
|
|
return dictionary
|
|
|
|
except Exception as e:
|
|
return {"QSPN error": f"{e}"}
|