1067 lines
35 KiB
Python
1067 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IQ & Metadata Streaming Interface (IQ-Prioritized with Accumulation)
|
|
|
|
Handles multiple IQ packets per slot by accumulating them before correlation.
|
|
"""
|
|
|
|
import logging
|
|
import queue
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import time
|
|
from collections import OrderedDict
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("IQInterface")
|
|
|
|
|
|
# =============================================================================
|
|
# Data Classes
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class Metadata:
|
|
"""RB allocation metadata from scheduler"""
|
|
|
|
slot_num: int
|
|
rb_bitmap: bytes
|
|
dl_flag: bool = True
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
@property
|
|
def rb_bitmap_binary(self) -> str:
|
|
return "".join(format(b, "08b") for b in self.rb_bitmap)
|
|
|
|
@property
|
|
def rb_bitmap_array(self) -> np.ndarray:
|
|
bits = []
|
|
for byte in self.rb_bitmap:
|
|
for i in range(8):
|
|
bits.append((byte >> (7 - i)) & 1)
|
|
return np.array(bits, dtype=np.uint8)
|
|
|
|
@property
|
|
def allocated_rbs(self) -> List[int]:
|
|
return [i for i, bit in enumerate(self.rb_bitmap_array) if bit == 1]
|
|
|
|
@property
|
|
def num_allocated_rbs(self) -> int:
|
|
return sum(bin(b).count("1") for b in self.rb_bitmap)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"slot_num": self.slot_num,
|
|
"rb_bitmap_hex": self.rb_bitmap.hex(),
|
|
"rb_bitmap_binary": self.rb_bitmap_binary,
|
|
"num_allocated_rbs": self.num_allocated_rbs,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class IQSamples:
|
|
"""IQ samples from radio"""
|
|
|
|
slot_num: int
|
|
samples: np.ndarray
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
@property
|
|
def num_samples(self) -> int:
|
|
return len(self.samples)
|
|
|
|
@property
|
|
def i_samples(self) -> np.ndarray:
|
|
return np.real(self.samples)
|
|
|
|
@property
|
|
def q_samples(self) -> np.ndarray:
|
|
return np.imag(self.samples)
|
|
|
|
@property
|
|
def magnitude(self) -> np.ndarray:
|
|
return np.abs(self.samples)
|
|
|
|
@property
|
|
def phase(self) -> np.ndarray:
|
|
return np.angle(self.samples)
|
|
|
|
@property
|
|
def power_db(self) -> float:
|
|
return 10 * np.log10(np.mean(np.abs(self.samples) ** 2) + 1e-10)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"slot_num": self.slot_num,
|
|
"num_samples": self.num_samples,
|
|
"power_db": self.power_db,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class CorrelatedData:
|
|
"""Correlated IQ samples and metadata for the same slot"""
|
|
|
|
slot_num: int
|
|
metadata: Metadata
|
|
iq: IQSamples
|
|
correlation_timestamp: float = field(default_factory=time.time)
|
|
|
|
@property
|
|
def samples(self) -> np.ndarray:
|
|
return self.iq.samples
|
|
|
|
@property
|
|
def rb_bitmap(self) -> bytes:
|
|
return self.metadata.rb_bitmap
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"slot_num": self.slot_num,
|
|
"metadata": self.metadata.to_dict(),
|
|
"iq": self.iq.to_dict(),
|
|
"correlation_timestamp": self.correlation_timestamp,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Statistics
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class InterfaceStats:
|
|
"""Interface statistics"""
|
|
|
|
start_time: float = field(default_factory=time.time)
|
|
|
|
# Reception counts
|
|
metadata_received: int = 0
|
|
metadata_bytes: int = 0
|
|
iq_packets_received: int = 0
|
|
iq_samples_received: int = 0
|
|
iq_bytes: int = 0
|
|
|
|
# Correlation stats
|
|
slots_correlated: int = 0
|
|
iq_packets_correlated: int = 0
|
|
slots_without_metadata: int = 0
|
|
metadata_expired: int = 0
|
|
|
|
# Buffer stats
|
|
metadata_buffer_hits: int = 0
|
|
metadata_buffer_misses: int = 0
|
|
|
|
# Callback stats
|
|
callbacks_invoked: int = 0
|
|
callback_errors: int = 0
|
|
|
|
def reset(self) -> None:
|
|
"""Reset all statistics to zero"""
|
|
self.start_time = time.time()
|
|
self.metadata_received = 0
|
|
self.metadata_bytes = 0
|
|
self.iq_packets_received = 0
|
|
self.iq_samples_received = 0
|
|
self.iq_bytes = 0
|
|
self.slots_correlated = 0
|
|
self.iq_packets_correlated = 0
|
|
self.slots_without_metadata = 0
|
|
self.metadata_expired = 0
|
|
self.metadata_buffer_hits = 0
|
|
self.metadata_buffer_misses = 0
|
|
self.callbacks_invoked = 0
|
|
self.callback_errors = 0
|
|
|
|
def snapshot(self) -> "InterfaceStats":
|
|
"""Create a copy of current stats"""
|
|
return InterfaceStats(
|
|
start_time=self.start_time,
|
|
metadata_received=self.metadata_received,
|
|
metadata_bytes=self.metadata_bytes,
|
|
iq_packets_received=self.iq_packets_received,
|
|
iq_samples_received=self.iq_samples_received,
|
|
iq_bytes=self.iq_bytes,
|
|
slots_correlated=self.slots_correlated,
|
|
iq_packets_correlated=self.iq_packets_correlated,
|
|
slots_without_metadata=self.slots_without_metadata,
|
|
metadata_expired=self.metadata_expired,
|
|
metadata_buffer_hits=self.metadata_buffer_hits,
|
|
metadata_buffer_misses=self.metadata_buffer_misses,
|
|
callbacks_invoked=self.callbacks_invoked,
|
|
callback_errors=self.callback_errors,
|
|
)
|
|
|
|
@property
|
|
def runtime(self) -> float:
|
|
return time.time() - self.start_time
|
|
|
|
@property
|
|
def metadata_rate(self) -> float:
|
|
return self.metadata_received / max(self.runtime, 0.001)
|
|
|
|
@property
|
|
def iq_packet_rate(self) -> float:
|
|
return self.iq_packets_received / max(self.runtime, 0.001)
|
|
|
|
@property
|
|
def slot_rate(self) -> float:
|
|
return self.slots_correlated / max(self.runtime, 0.001)
|
|
|
|
@property
|
|
def iq_bandwidth_mbps(self) -> float:
|
|
return (self.iq_bytes / max(self.runtime, 0.001)) / 1e6
|
|
|
|
@property
|
|
def correlation_rate(self) -> float:
|
|
"""Percentage of completed slots that found metadata"""
|
|
total_slots = self.slots_correlated + self.slots_without_metadata
|
|
if total_slots == 0:
|
|
return 0.0
|
|
return self.slots_correlated / total_slots * 100
|
|
|
|
@property
|
|
def avg_packets_per_slot(self) -> float:
|
|
"""Average IQ packets per correlated slot"""
|
|
if self.slots_correlated == 0:
|
|
return 0.0
|
|
return self.iq_packets_correlated / self.slots_correlated
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"runtime_s": self.runtime,
|
|
"metadata_received": self.metadata_received,
|
|
"metadata_rate": self.metadata_rate,
|
|
"iq_packets_received": self.iq_packets_received,
|
|
"iq_packet_rate": self.iq_packet_rate,
|
|
"iq_bandwidth_mbps": self.iq_bandwidth_mbps,
|
|
"slots_correlated": self.slots_correlated,
|
|
"slot_rate": self.slot_rate,
|
|
"correlation_rate_pct": self.correlation_rate,
|
|
"avg_packets_per_slot": self.avg_packets_per_slot,
|
|
"slots_without_metadata": self.slots_without_metadata,
|
|
"metadata_expired": self.metadata_expired,
|
|
}
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f"Runtime: {self.runtime:.1f}s | "
|
|
f"Meta: {self.metadata_received} ({self.metadata_rate:.1f}/s) | "
|
|
f"IQ pkts: {self.iq_packets_received} ({self.iq_packet_rate:.1f}/s) | "
|
|
f"Slots: {self.slots_correlated} ({self.correlation_rate:.1f}%) | "
|
|
f"Avg pkts/slot: {self.avg_packets_per_slot:.1f}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Metadata Buffer
|
|
# =============================================================================
|
|
|
|
|
|
class MetadataBuffer:
|
|
"""Buffer for metadata, indexed by slot number"""
|
|
|
|
def __init__(self, max_slots: int = 1000, slot_window: int = 500):
|
|
# Key is (slot_num, dl_flag) so that flexible slots can store both
|
|
# a DL and a UL metadata entry under the same slot number.
|
|
self._buffer: OrderedDict[Tuple[int, bool], Metadata] = OrderedDict()
|
|
self._max_slots = max_slots
|
|
self._slot_window = slot_window
|
|
self._lock = threading.Lock()
|
|
self._latest_slot = 0
|
|
|
|
def store(self, metadata: Metadata) -> None:
|
|
"""Store metadata in buffer"""
|
|
with self._lock:
|
|
key = (metadata.slot_num, metadata.dl_flag)
|
|
self._buffer[key] = metadata
|
|
|
|
if self._is_newer_slot(metadata.slot_num, self._latest_slot):
|
|
self._latest_slot = metadata.slot_num
|
|
|
|
while len(self._buffer) > self._max_slots:
|
|
self._buffer.popitem(last=False)
|
|
|
|
def lookup(self, slot_num: int) -> Optional[Metadata]:
|
|
"""Look up first metadata entry for a slot (does NOT remove it)"""
|
|
with self._lock:
|
|
for (sn, _), meta in self._buffer.items():
|
|
if sn == slot_num:
|
|
return meta
|
|
return None
|
|
|
|
def remove(self, slot_num: int) -> Optional[Metadata]:
|
|
"""Remove first metadata entry for a slot (backward compat)"""
|
|
with self._lock:
|
|
for key in list(self._buffer.keys()):
|
|
if key[0] == slot_num:
|
|
return self._buffer.pop(key)
|
|
return None
|
|
|
|
def remove_all(self, slot_num: int) -> List[Metadata]:
|
|
"""Remove ALL metadata entries for a slot (handles flexible slots)"""
|
|
with self._lock:
|
|
keys = [k for k in self._buffer if k[0] == slot_num]
|
|
return [self._buffer.pop(k) for k in keys]
|
|
|
|
def cleanup_old(
|
|
self, current_slot: int, expired_callback: Optional[Callable] = None
|
|
) -> int:
|
|
"""Remove metadata older than slot_window"""
|
|
with self._lock:
|
|
threshold = (current_slot - self._slot_window) & 0xFFFF
|
|
|
|
expired_keys = [
|
|
k for k in self._buffer
|
|
if self._is_older_slot(k[0], threshold, current_slot)
|
|
]
|
|
|
|
for key in expired_keys:
|
|
meta = self._buffer.pop(key)
|
|
if expired_callback:
|
|
expired_callback(meta)
|
|
|
|
return len(expired_keys)
|
|
|
|
def _is_newer_slot(self, slot_a: int, slot_b: int) -> bool:
|
|
diff = (slot_a - slot_b) & 0xFFFF
|
|
return diff < 0x8000
|
|
|
|
def _is_older_slot(self, slot: int, threshold: int, current: int) -> bool:
|
|
dist_to_current = (current - slot) & 0xFFFF
|
|
dist_to_threshold = (current - threshold) & 0xFFFF
|
|
return dist_to_current > dist_to_threshold
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
with self._lock:
|
|
return len(self._buffer)
|
|
|
|
@property
|
|
def latest_slot(self) -> int:
|
|
return self._latest_slot
|
|
|
|
|
|
# =============================================================================
|
|
# IQ Accumulator (Collects multiple IQ packets per slot)
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class SlotAccumulator:
|
|
"""Accumulates IQ packets for a single slot"""
|
|
|
|
slot_num: int
|
|
packets: List[np.ndarray] = field(default_factory=list)
|
|
first_packet_time: float = field(default_factory=time.time)
|
|
packet_count: int = 0
|
|
|
|
def add_packet(self, samples: np.ndarray) -> None:
|
|
"""Add an IQ packet to this slot"""
|
|
self.packets.append(samples)
|
|
self.packet_count += 1
|
|
|
|
def get_combined_samples(self) -> np.ndarray:
|
|
"""Combine all packets into single array"""
|
|
if not self.packets:
|
|
return np.array([], dtype=np.complex64)
|
|
return np.concatenate(self.packets)
|
|
|
|
@property
|
|
def total_samples(self) -> int:
|
|
return sum(len(p) for p in self.packets)
|
|
|
|
@property
|
|
def age(self) -> float:
|
|
"""Time since first packet"""
|
|
return time.time() - self.first_packet_time
|
|
|
|
|
|
class IQAccumulatorBuffer:
|
|
"""
|
|
Accumulates IQ packets per slot.
|
|
|
|
Emits complete slot when:
|
|
- New slot arrives (previous slot is complete)
|
|
- Timeout reached
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_slots: int = 100,
|
|
slot_timeout: float = 0.1, # seconds
|
|
):
|
|
self._buffer: OrderedDict[int, SlotAccumulator] = OrderedDict()
|
|
self._max_slots = max_slots
|
|
self._slot_timeout = slot_timeout
|
|
self._lock = threading.Lock()
|
|
self._latest_slot = -1
|
|
|
|
def add_packet(self, slot_num: int, samples: np.ndarray) -> List[SlotAccumulator]:
|
|
"""
|
|
Add an IQ packet. Returns list of completed slots to emit.
|
|
|
|
A slot is considered complete when:
|
|
- A newer slot arrives (the older slot won't get more packets)
|
|
- The slot times out
|
|
"""
|
|
completed = []
|
|
|
|
with self._lock:
|
|
# Check for completed slots (older than current)
|
|
if self._latest_slot >= 0 and slot_num != self._latest_slot:
|
|
completed.extend(self._flush_older_slots(slot_num))
|
|
|
|
# Add packet to accumulator
|
|
if slot_num not in self._buffer:
|
|
self._buffer[slot_num] = SlotAccumulator(slot_num=slot_num)
|
|
|
|
self._buffer[slot_num].add_packet(samples)
|
|
|
|
# Update latest slot
|
|
if self._is_newer_slot(slot_num, self._latest_slot):
|
|
self._latest_slot = slot_num
|
|
|
|
# Check for timed-out slots
|
|
completed.extend(self._flush_timed_out())
|
|
|
|
# Limit buffer size
|
|
while len(self._buffer) > self._max_slots:
|
|
oldest_slot = next(iter(self._buffer))
|
|
completed.append(self._buffer.pop(oldest_slot))
|
|
|
|
return completed
|
|
|
|
def _flush_older_slots(self, current_slot: int) -> List[SlotAccumulator]:
|
|
"""Flush slots older than current"""
|
|
completed = []
|
|
slots_to_remove = []
|
|
|
|
for slot_num, accumulator in self._buffer.items():
|
|
if self._is_older_slot(slot_num, current_slot):
|
|
slots_to_remove.append(slot_num)
|
|
|
|
for slot_num in slots_to_remove:
|
|
completed.append(self._buffer.pop(slot_num))
|
|
|
|
return completed
|
|
|
|
def _flush_timed_out(self) -> List[SlotAccumulator]:
|
|
"""Flush slots that have timed out"""
|
|
completed = []
|
|
slots_to_remove = []
|
|
|
|
for slot_num, accumulator in self._buffer.items():
|
|
if accumulator.age > self._slot_timeout:
|
|
slots_to_remove.append(slot_num)
|
|
|
|
for slot_num in slots_to_remove:
|
|
completed.append(self._buffer.pop(slot_num))
|
|
|
|
return completed
|
|
|
|
def flush_all(self) -> List[SlotAccumulator]:
|
|
"""Flush all remaining slots"""
|
|
with self._lock:
|
|
completed = list(self._buffer.values())
|
|
self._buffer.clear()
|
|
return completed
|
|
|
|
def _is_newer_slot(self, slot_a: int, slot_b: int) -> bool:
|
|
if slot_b < 0:
|
|
return True
|
|
diff = (slot_a - slot_b) & 0xFFFF
|
|
return diff < 0x8000
|
|
|
|
def _is_older_slot(self, slot_a: int, slot_b: int) -> bool:
|
|
diff = (slot_b - slot_a) & 0xFFFF
|
|
return diff < 0x8000 and diff > 0
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
with self._lock:
|
|
return len(self._buffer)
|
|
|
|
|
|
# =============================================================================
|
|
# Main Interface Class
|
|
# =============================================================================
|
|
|
|
|
|
class IQMetadataInterface:
|
|
"""
|
|
IQ-Prioritized interface with packet accumulation.
|
|
|
|
Handles multiple IQ packets per slot by accumulating them.
|
|
Correlates with buffered metadata when slot is complete.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
iq_port: int = 5588,
|
|
metadata_port: int = 5589,
|
|
bind_address: str = "127.0.0.1",
|
|
metadata_buffer_size: int = 1000,
|
|
metadata_slot_window: int = 500,
|
|
iq_accumulator_size: int = 100,
|
|
iq_slot_timeout: float = 0.05, # 50ms timeout per slot
|
|
output_queue_size: int = 1000,
|
|
iq_socket_buffer: int = 8 * 1024 * 1024,
|
|
metadata_socket_buffer: int = 1024 * 1024,
|
|
cleanup_interval: int = 100,
|
|
):
|
|
self.iq_port = iq_port
|
|
self.metadata_port = metadata_port
|
|
self.bind_address = bind_address
|
|
self._cleanup_interval = cleanup_interval
|
|
|
|
# Metadata buffer (stores metadata until IQ slot completes)
|
|
self._metadata_buffer = MetadataBuffer(
|
|
max_slots=metadata_buffer_size, slot_window=metadata_slot_window
|
|
)
|
|
|
|
# IQ accumulator (collects multiple packets per slot)
|
|
self._iq_accumulator = IQAccumulatorBuffer(
|
|
max_slots=iq_accumulator_size, slot_timeout=iq_slot_timeout
|
|
)
|
|
|
|
# Output queue
|
|
self._output_queue: queue.Queue[CorrelatedData] = queue.Queue(
|
|
maxsize=output_queue_size
|
|
)
|
|
|
|
# Callbacks
|
|
self._correlated_callbacks: List[Callable[[CorrelatedData], None]] = []
|
|
self._iq_callbacks: List[Callable[[IQSamples], None]] = []
|
|
self._metadata_callbacks: List[Callable[[Metadata], None]] = []
|
|
|
|
# Thread control
|
|
self._running = False
|
|
self._threads: List[threading.Thread] = []
|
|
self._stop_event = threading.Event()
|
|
|
|
# Statistics
|
|
self.stats = InterfaceStats()
|
|
|
|
# Socket configuration
|
|
self._iq_socket_buffer = iq_socket_buffer
|
|
self._metadata_socket_buffer = metadata_socket_buffer
|
|
|
|
# Tracking
|
|
self._latest_iq_slot = 0
|
|
# For tracking per-capture stats
|
|
self._capture_stats_start: Optional[InterfaceStats] = None
|
|
|
|
logger.info(
|
|
f"IQMetadataInterface initialized (IQ:{iq_port}, Meta:{metadata_port})"
|
|
)
|
|
logger.info(f" Metadata buffer: {metadata_buffer_size} slots")
|
|
logger.info(f" IQ accumulator: timeout={iq_slot_timeout*1000:.0f}ms")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Callback Registration
|
|
# -------------------------------------------------------------------------
|
|
|
|
def on_correlated_data(
|
|
self, callback: Callable[[CorrelatedData], None]
|
|
) -> Callable:
|
|
self._correlated_callbacks.append(callback)
|
|
return callback
|
|
|
|
def on_iq_data(self, callback: Callable[[IQSamples], None]) -> Callable:
|
|
self._iq_callbacks.append(callback)
|
|
return callback
|
|
|
|
def on_metadata(self, callback: Callable[[Metadata], None]) -> Callable:
|
|
self._metadata_callbacks.append(callback)
|
|
return callback
|
|
|
|
def remove_callback(self, callback: Callable) -> bool:
|
|
for cb_list in [
|
|
self._correlated_callbacks,
|
|
self._iq_callbacks,
|
|
self._metadata_callbacks,
|
|
]:
|
|
if callback in cb_list:
|
|
cb_list.remove(callback)
|
|
return True
|
|
return False
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Queue-based Access
|
|
# -------------------------------------------------------------------------
|
|
|
|
def get(
|
|
self, block: bool = True, timeout: Optional[float] = None
|
|
) -> Optional[CorrelatedData]:
|
|
try:
|
|
return self._output_queue.get(block=block, timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def get_nowait(self) -> Optional[CorrelatedData]:
|
|
return self.get(block=False)
|
|
|
|
def get_all(self, max_items: int = 100) -> List[CorrelatedData]:
|
|
items = []
|
|
while len(items) < max_items:
|
|
item = self.get_nowait()
|
|
if item is None:
|
|
break
|
|
items.append(item)
|
|
return items
|
|
|
|
@property
|
|
def queue_size(self) -> int:
|
|
return self._output_queue.qsize()
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Iterator Interface
|
|
# -------------------------------------------------------------------------
|
|
|
|
def __iter__(self) -> Iterator[CorrelatedData]:
|
|
return self
|
|
|
|
def __next__(self) -> CorrelatedData:
|
|
if not self._running:
|
|
raise StopIteration
|
|
data = self.get(block=True, timeout=1.0)
|
|
if data is None:
|
|
if not self._running:
|
|
raise StopIteration
|
|
return self.__next__()
|
|
return data
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Lifecycle Management
|
|
# -------------------------------------------------------------------------
|
|
|
|
def start(self) -> "IQMetadataInterface":
|
|
if self._running:
|
|
logger.warning("Interface already running")
|
|
return self
|
|
|
|
self._running = True
|
|
self._stop_event.clear()
|
|
self.stats = InterfaceStats()
|
|
|
|
self._threads = [
|
|
threading.Thread(
|
|
target=self._receive_metadata, name="MetadataRx", daemon=True
|
|
),
|
|
threading.Thread(target=self._receive_iq, name="IQRx", daemon=True),
|
|
]
|
|
|
|
for t in self._threads:
|
|
t.start()
|
|
logger.info(f"Started thread: {t.name}")
|
|
|
|
logger.info("IQMetadataInterface started (IQ-prioritized with accumulation)")
|
|
return self
|
|
|
|
def stop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
|
|
self._running = False
|
|
self._stop_event.set()
|
|
|
|
# Flush remaining accumulated IQ
|
|
remaining = self._iq_accumulator.flush_all()
|
|
for slot_acc in remaining:
|
|
self._process_completed_slot(slot_acc)
|
|
|
|
for t in self._threads:
|
|
t.join(timeout=2.0)
|
|
|
|
self._threads.clear()
|
|
logger.info("IQMetadataInterface stopped")
|
|
|
|
def __enter__(self) -> "IQMetadataInterface":
|
|
return self.start()
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
|
|
self.stop()
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Internal: Packet Parsing
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _parse_metadata_packet(self, data: bytes) -> Optional[Metadata]:
|
|
# Packet layout: [slot_num 2B][dl_flag 1B][rb_bitmap NB]
|
|
if len(data) < 3:
|
|
return None
|
|
slot_num = struct.unpack(">H", data[0:2])[0]
|
|
dl_flag = bool(data[2])
|
|
rb_bitmap = data[3:]
|
|
return Metadata(slot_num=slot_num, rb_bitmap=rb_bitmap, dl_flag=dl_flag)
|
|
|
|
def _parse_iq_packet(self, data: bytes) -> Optional[tuple]:
|
|
"""Returns (slot_num, samples) tuple"""
|
|
if len(data) < 10:
|
|
return None
|
|
slot_num = struct.unpack(">H", data[0:2])[0]
|
|
samples = np.frombuffer(data[2:], dtype=np.float32).view(np.complex64)
|
|
return (slot_num, samples)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Internal: Slot Processing
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _process_completed_slot(self, slot_acc: SlotAccumulator) -> None:
|
|
"""Process a completed slot (all IQ packets received)"""
|
|
slot_num = slot_acc.slot_num
|
|
|
|
# Remove ALL metadata entries for this slot (1 for normal slots,
|
|
# 2 for flexible slots that carry both DL and UL bitmaps).
|
|
metadatas = self._metadata_buffer.remove_all(slot_num)
|
|
|
|
if metadatas:
|
|
self.stats.metadata_buffer_hits += 1
|
|
self.stats.iq_packets_correlated += slot_acc.packet_count
|
|
|
|
combined_samples = slot_acc.get_combined_samples()
|
|
iq = IQSamples(slot_num=slot_num, samples=combined_samples)
|
|
|
|
for metadata in metadatas:
|
|
self.stats.slots_correlated += 1
|
|
correlated = CorrelatedData(slot_num=slot_num, metadata=metadata, iq=iq)
|
|
self._emit_correlated(correlated)
|
|
else:
|
|
# No metadata found
|
|
self.stats.metadata_buffer_misses += 1
|
|
self.stats.slots_without_metadata += 1
|
|
logger.debug(
|
|
f"Slot {slot_num}: no metadata found ({slot_acc.packet_count} IQ packets)"
|
|
)
|
|
|
|
def _emit_correlated(self, data: CorrelatedData) -> None:
|
|
"""Emit correlated data to callbacks and queue"""
|
|
try:
|
|
self._output_queue.put_nowait(data)
|
|
except queue.Full:
|
|
try:
|
|
self._output_queue.get_nowait()
|
|
self._output_queue.put_nowait(data)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
for callback in self._correlated_callbacks:
|
|
try:
|
|
callback(data)
|
|
self.stats.callbacks_invoked += 1
|
|
except Exception as e:
|
|
self.stats.callback_errors += 1
|
|
logger.error(f"Callback error: {e}")
|
|
|
|
def _on_metadata_expired(self, metadata: Metadata) -> None:
|
|
self.stats.metadata_expired += 1
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Internal: Receive Threads
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _flush_socket(self, sock: socket.socket) -> int:
|
|
sock.setblocking(False)
|
|
count = 0
|
|
try:
|
|
while True:
|
|
sock.recv(65535)
|
|
count += 1
|
|
except BlockingIOError:
|
|
pass
|
|
sock.setblocking(True)
|
|
sock.settimeout(1.0)
|
|
return count
|
|
|
|
def _receive_metadata(self) -> None:
|
|
"""Thread: receive metadata and store in buffer"""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(
|
|
socket.SOL_SOCKET, socket.SO_RCVBUF, self._metadata_socket_buffer
|
|
)
|
|
sock.bind((self.bind_address, self.metadata_port))
|
|
|
|
flushed = self._flush_socket(sock)
|
|
if flushed > 0:
|
|
logger.debug(f"Flushed {flushed} old metadata packets")
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
data, _ = sock.recvfrom(65535)
|
|
if len(data) == 0:
|
|
continue
|
|
|
|
metadata = self._parse_metadata_packet(data)
|
|
if metadata is None:
|
|
continue
|
|
|
|
self.stats.metadata_received += 1
|
|
self.stats.metadata_bytes += len(data)
|
|
|
|
for callback in self._metadata_callbacks:
|
|
try:
|
|
callback(metadata)
|
|
except Exception as e:
|
|
logger.error(f"Metadata callback error: {e}")
|
|
|
|
# Store in buffer
|
|
self._metadata_buffer.store(metadata)
|
|
|
|
except socket.timeout:
|
|
continue
|
|
except Exception as e:
|
|
if self._running:
|
|
logger.error(f"Metadata receive error: {e}")
|
|
|
|
sock.close()
|
|
|
|
def _receive_iq(self) -> None:
|
|
"""Thread: receive IQ packets, accumulate, correlate when slot complete"""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._iq_socket_buffer)
|
|
sock.bind((self.bind_address, self.iq_port))
|
|
|
|
flushed = self._flush_socket(sock)
|
|
if flushed > 0:
|
|
logger.debug(f"Flushed {flushed} old IQ packets")
|
|
|
|
packet_count = 0
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
data, _ = sock.recvfrom(65535)
|
|
if len(data) == 0:
|
|
continue
|
|
|
|
parsed = self._parse_iq_packet(data)
|
|
if parsed is None:
|
|
continue
|
|
|
|
slot_num, samples = parsed
|
|
|
|
# Update stats
|
|
self.stats.iq_packets_received += 1
|
|
self.stats.iq_samples_received += len(samples)
|
|
self.stats.iq_bytes += len(data)
|
|
self._latest_iq_slot = slot_num
|
|
packet_count += 1
|
|
|
|
# Raw IQ callback (per packet)
|
|
for callback in self._iq_callbacks:
|
|
try:
|
|
iq = IQSamples(slot_num=slot_num, samples=samples)
|
|
callback(iq)
|
|
except Exception as e:
|
|
logger.error(f"IQ callback error: {e}")
|
|
|
|
# Add to accumulator - returns completed slots
|
|
completed_slots = self._iq_accumulator.add_packet(slot_num, samples)
|
|
|
|
# Process any completed slots
|
|
for slot_acc in completed_slots:
|
|
self._process_completed_slot(slot_acc)
|
|
|
|
# Periodic cleanup
|
|
if packet_count % self._cleanup_interval == 0:
|
|
_ = self._metadata_buffer.cleanup_old(
|
|
slot_num, expired_callback=self._on_metadata_expired
|
|
)
|
|
|
|
except socket.timeout:
|
|
continue
|
|
except Exception as e:
|
|
if self._running:
|
|
logger.error(f"IQ receive error: {e}")
|
|
|
|
sock.close()
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Utility Methods
|
|
# -------------------------------------------------------------------------
|
|
|
|
def reset_stats(self) -> None:
|
|
"""Reset all statistics to zero"""
|
|
self.stats.reset()
|
|
self._capture_stats_start = None
|
|
logger.info("Statistics reset")
|
|
|
|
def get_stats(self) -> InterfaceStats:
|
|
"""Get current aggregated statistics"""
|
|
return self.stats
|
|
|
|
def start_stats_capture(self) -> None:
|
|
"""Mark the start of a stats capture period"""
|
|
self._capture_stats_start = self.stats.snapshot()
|
|
|
|
def get_capture_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get statistics for current capture period only.
|
|
|
|
Returns stats since last start_stats_capture() call.
|
|
"""
|
|
if self._capture_stats_start is None:
|
|
# No capture started, return current totals
|
|
return self.stats.to_dict()
|
|
|
|
start = self._capture_stats_start
|
|
current = self.stats
|
|
|
|
# Calculate deltas
|
|
runtime = current.runtime - (start.start_time - current.start_time)
|
|
# capture_start = time.time() - runtime
|
|
|
|
metadata_received = current.metadata_received - start.metadata_received
|
|
iq_packets = current.iq_packets_received - start.iq_packets_received
|
|
iq_samples = current.iq_samples_received - start.iq_samples_received
|
|
iq_bytes = current.iq_bytes - start.iq_bytes
|
|
slots_correlated = current.slots_correlated - start.slots_correlated
|
|
iq_packets_correlated = (
|
|
current.iq_packets_correlated - start.iq_packets_correlated
|
|
)
|
|
slots_without_meta = (
|
|
current.slots_without_metadata - start.slots_without_metadata
|
|
)
|
|
metadata_expired = current.metadata_expired - start.metadata_expired
|
|
|
|
# Calculate rates
|
|
duration = time.time() - (
|
|
self._capture_stats_start.start_time
|
|
+ (time.time() - current.start_time - current.runtime)
|
|
)
|
|
duration = max(duration, 0.001)
|
|
|
|
# Simpler duration calculation
|
|
duration = runtime if runtime > 0 else 0.001
|
|
|
|
total_slots = slots_correlated + slots_without_meta
|
|
correlation_rate = (
|
|
(slots_correlated / total_slots * 100) if total_slots > 0 else 0.0
|
|
)
|
|
avg_packets = (
|
|
(iq_packets_correlated / slots_correlated) if slots_correlated > 0 else 0.0
|
|
)
|
|
|
|
return {
|
|
"capture_duration": duration,
|
|
"metadata_received": metadata_received,
|
|
"metadata_rate": metadata_received / duration,
|
|
"iq_packets_received": iq_packets,
|
|
"iq_packet_rate": iq_packets / duration,
|
|
"iq_samples_received": iq_samples,
|
|
"iq_bytes": iq_bytes,
|
|
"iq_bandwidth_mbps": (iq_bytes / duration) / 1e6,
|
|
"slots_correlated": slots_correlated,
|
|
"slot_rate": slots_correlated / duration,
|
|
"correlation_rate_pct": correlation_rate,
|
|
"avg_packets_per_slot": avg_packets,
|
|
"slots_without_metadata": slots_without_meta,
|
|
"metadata_expired": metadata_expired,
|
|
}
|
|
|
|
def print_stats(self, capture_only: bool = False) -> None:
|
|
"""
|
|
Print statistics.
|
|
|
|
Args:
|
|
capture_only: If True, print only current capture stats
|
|
"""
|
|
if capture_only and self._capture_stats_start is not None:
|
|
stats = self.get_capture_stats()
|
|
print("\n=== Capture Stats ===")
|
|
else:
|
|
stats = self.stats.to_dict()
|
|
print("\n=== Aggregated Stats ===")
|
|
|
|
print(
|
|
f"Duration: {stats.get('capture_duration', stats.get('runtime_s', 0)):.1f}s"
|
|
)
|
|
# print(f"Metadata received: {stats['metadata_received']}")
|
|
# print(f"IQ packets received: {stats['iq_packets_received']}")
|
|
# print(f"Slots correlated: {stats['slots_correlated']}")
|
|
print(f"Correlation rate: {stats['correlation_rate_pct']:.1f}%")
|
|
# print(f"Avg packets/slot: {stats['avg_packets_per_slot']:.1f}")
|
|
print(f"Slots without meta: {stats['slots_without_metadata']}")
|
|
# print(f"Metadata expired: {stats['metadata_expired']}")
|
|
|
|
if not capture_only:
|
|
print(f"Metadata buffer: {self._metadata_buffer.size} slots")
|
|
print(f"IQ accumulator: {self._iq_accumulator.size} slots pending")
|
|
|
|
@property
|
|
def metadata_buffer_size(self) -> int:
|
|
return self._metadata_buffer.size
|
|
|
|
@property
|
|
def latest_iq_slot(self) -> int:
|
|
return self._latest_iq_slot
|
|
|
|
@property
|
|
def latest_metadata_slot(self) -> int:
|
|
return self._metadata_buffer.latest_slot
|
|
|
|
|
|
# =============================================================================
|
|
# Convenience Functions
|
|
# =============================================================================
|
|
|
|
|
|
def create_interface(
|
|
iq_port: int = 5588, metadata_port: int = 5589, **kwargs
|
|
) -> IQMetadataInterface:
|
|
return IQMetadataInterface(iq_port=iq_port, metadata_port=metadata_port, **kwargs)
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="IQ Metadata Interface Test")
|
|
parser.add_argument("--iq-port", type=int, default=5588)
|
|
parser.add_argument("--metadata-port", type=int, default=5589)
|
|
parser.add_argument("--stats-interval", type=float, default=2.0)
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print(" IQ Metadata Interface (with Accumulation)")
|
|
print("=" * 60)
|
|
|
|
interface = IQMetadataInterface(
|
|
iq_port=args.iq_port, metadata_port=args.metadata_port
|
|
)
|
|
|
|
@interface.on_correlated_data
|
|
def handle_data(data: CorrelatedData):
|
|
print(
|
|
f"[SLOT {data.slot_num:5d}] "
|
|
f"IQ: {data.iq.num_samples:6d} samples | "
|
|
f"RBs: {data.metadata.num_allocated_rbs:3d} | "
|
|
f"Power: {data.iq.power_db:6.1f} dB"
|
|
)
|
|
|
|
interface.start()
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(args.stats_interval)
|
|
print()
|
|
interface.print_stats()
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
interface.stop()
|