gain-viz/gain_viz/iq_metadata_interface.py

1067 lines
35 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
IQ & Metadata Streaming Interface (IQ-Prioritized with Accumulation)
Handles multiple IQ packets per slot by accumulating them before correlation.
"""
import logging
import queue
import socket
import struct
import threading
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("IQInterface")
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class Metadata:
"""RB allocation metadata from scheduler"""
slot_num: int
rb_bitmap: bytes
dl_flag: bool = True
timestamp: float = field(default_factory=time.time)
@property
def rb_bitmap_binary(self) -> str:
return "".join(format(b, "08b") for b in self.rb_bitmap)
@property
def rb_bitmap_array(self) -> np.ndarray:
bits = []
for byte in self.rb_bitmap:
for i in range(8):
bits.append((byte >> (7 - i)) & 1)
return np.array(bits, dtype=np.uint8)
@property
def allocated_rbs(self) -> List[int]:
return [i for i, bit in enumerate(self.rb_bitmap_array) if bit == 1]
@property
def num_allocated_rbs(self) -> int:
return sum(bin(b).count("1") for b in self.rb_bitmap)
def to_dict(self) -> Dict[str, Any]:
return {
"slot_num": self.slot_num,
"rb_bitmap_hex": self.rb_bitmap.hex(),
"rb_bitmap_binary": self.rb_bitmap_binary,
"num_allocated_rbs": self.num_allocated_rbs,
"timestamp": self.timestamp,
}
@dataclass
class IQSamples:
"""IQ samples from radio"""
slot_num: int
samples: np.ndarray
timestamp: float = field(default_factory=time.time)
@property
def num_samples(self) -> int:
return len(self.samples)
@property
def i_samples(self) -> np.ndarray:
return np.real(self.samples)
@property
def q_samples(self) -> np.ndarray:
return np.imag(self.samples)
@property
def magnitude(self) -> np.ndarray:
return np.abs(self.samples)
@property
def phase(self) -> np.ndarray:
return np.angle(self.samples)
@property
def power_db(self) -> float:
return 10 * np.log10(np.mean(np.abs(self.samples) ** 2) + 1e-10)
def to_dict(self) -> Dict[str, Any]:
return {
"slot_num": self.slot_num,
"num_samples": self.num_samples,
"power_db": self.power_db,
"timestamp": self.timestamp,
}
@dataclass
class CorrelatedData:
"""Correlated IQ samples and metadata for the same slot"""
slot_num: int
metadata: Metadata
iq: IQSamples
correlation_timestamp: float = field(default_factory=time.time)
@property
def samples(self) -> np.ndarray:
return self.iq.samples
@property
def rb_bitmap(self) -> bytes:
return self.metadata.rb_bitmap
def to_dict(self) -> Dict[str, Any]:
return {
"slot_num": self.slot_num,
"metadata": self.metadata.to_dict(),
"iq": self.iq.to_dict(),
"correlation_timestamp": self.correlation_timestamp,
}
# =============================================================================
# Statistics
# =============================================================================
@dataclass
class InterfaceStats:
"""Interface statistics"""
start_time: float = field(default_factory=time.time)
# Reception counts
metadata_received: int = 0
metadata_bytes: int = 0
iq_packets_received: int = 0
iq_samples_received: int = 0
iq_bytes: int = 0
# Correlation stats
slots_correlated: int = 0
iq_packets_correlated: int = 0
slots_without_metadata: int = 0
metadata_expired: int = 0
# Buffer stats
metadata_buffer_hits: int = 0
metadata_buffer_misses: int = 0
# Callback stats
callbacks_invoked: int = 0
callback_errors: int = 0
def reset(self) -> None:
"""Reset all statistics to zero"""
self.start_time = time.time()
self.metadata_received = 0
self.metadata_bytes = 0
self.iq_packets_received = 0
self.iq_samples_received = 0
self.iq_bytes = 0
self.slots_correlated = 0
self.iq_packets_correlated = 0
self.slots_without_metadata = 0
self.metadata_expired = 0
self.metadata_buffer_hits = 0
self.metadata_buffer_misses = 0
self.callbacks_invoked = 0
self.callback_errors = 0
def snapshot(self) -> "InterfaceStats":
"""Create a copy of current stats"""
return InterfaceStats(
start_time=self.start_time,
metadata_received=self.metadata_received,
metadata_bytes=self.metadata_bytes,
iq_packets_received=self.iq_packets_received,
iq_samples_received=self.iq_samples_received,
iq_bytes=self.iq_bytes,
slots_correlated=self.slots_correlated,
iq_packets_correlated=self.iq_packets_correlated,
slots_without_metadata=self.slots_without_metadata,
metadata_expired=self.metadata_expired,
metadata_buffer_hits=self.metadata_buffer_hits,
metadata_buffer_misses=self.metadata_buffer_misses,
callbacks_invoked=self.callbacks_invoked,
callback_errors=self.callback_errors,
)
@property
def runtime(self) -> float:
return time.time() - self.start_time
@property
def metadata_rate(self) -> float:
return self.metadata_received / max(self.runtime, 0.001)
@property
def iq_packet_rate(self) -> float:
return self.iq_packets_received / max(self.runtime, 0.001)
@property
def slot_rate(self) -> float:
return self.slots_correlated / max(self.runtime, 0.001)
@property
def iq_bandwidth_mbps(self) -> float:
return (self.iq_bytes / max(self.runtime, 0.001)) / 1e6
@property
def correlation_rate(self) -> float:
"""Percentage of completed slots that found metadata"""
total_slots = self.slots_correlated + self.slots_without_metadata
if total_slots == 0:
return 0.0
return self.slots_correlated / total_slots * 100
@property
def avg_packets_per_slot(self) -> float:
"""Average IQ packets per correlated slot"""
if self.slots_correlated == 0:
return 0.0
return self.iq_packets_correlated / self.slots_correlated
def to_dict(self) -> Dict[str, Any]:
return {
"runtime_s": self.runtime,
"metadata_received": self.metadata_received,
"metadata_rate": self.metadata_rate,
"iq_packets_received": self.iq_packets_received,
"iq_packet_rate": self.iq_packet_rate,
"iq_bandwidth_mbps": self.iq_bandwidth_mbps,
"slots_correlated": self.slots_correlated,
"slot_rate": self.slot_rate,
"correlation_rate_pct": self.correlation_rate,
"avg_packets_per_slot": self.avg_packets_per_slot,
"slots_without_metadata": self.slots_without_metadata,
"metadata_expired": self.metadata_expired,
}
def __str__(self) -> str:
return (
f"Runtime: {self.runtime:.1f}s | "
f"Meta: {self.metadata_received} ({self.metadata_rate:.1f}/s) | "
f"IQ pkts: {self.iq_packets_received} ({self.iq_packet_rate:.1f}/s) | "
f"Slots: {self.slots_correlated} ({self.correlation_rate:.1f}%) | "
f"Avg pkts/slot: {self.avg_packets_per_slot:.1f}"
)
# =============================================================================
# Metadata Buffer
# =============================================================================
class MetadataBuffer:
"""Buffer for metadata, indexed by slot number"""
def __init__(self, max_slots: int = 1000, slot_window: int = 500):
# Key is (slot_num, dl_flag) so that flexible slots can store both
# a DL and a UL metadata entry under the same slot number.
self._buffer: OrderedDict[Tuple[int, bool], Metadata] = OrderedDict()
self._max_slots = max_slots
self._slot_window = slot_window
self._lock = threading.Lock()
self._latest_slot = 0
def store(self, metadata: Metadata) -> None:
"""Store metadata in buffer"""
with self._lock:
key = (metadata.slot_num, metadata.dl_flag)
self._buffer[key] = metadata
if self._is_newer_slot(metadata.slot_num, self._latest_slot):
self._latest_slot = metadata.slot_num
while len(self._buffer) > self._max_slots:
self._buffer.popitem(last=False)
def lookup(self, slot_num: int) -> Optional[Metadata]:
"""Look up first metadata entry for a slot (does NOT remove it)"""
with self._lock:
for (sn, _), meta in self._buffer.items():
if sn == slot_num:
return meta
return None
def remove(self, slot_num: int) -> Optional[Metadata]:
"""Remove first metadata entry for a slot (backward compat)"""
with self._lock:
for key in list(self._buffer.keys()):
if key[0] == slot_num:
return self._buffer.pop(key)
return None
def remove_all(self, slot_num: int) -> List[Metadata]:
"""Remove ALL metadata entries for a slot (handles flexible slots)"""
with self._lock:
keys = [k for k in self._buffer if k[0] == slot_num]
return [self._buffer.pop(k) for k in keys]
def cleanup_old(
self, current_slot: int, expired_callback: Optional[Callable] = None
) -> int:
"""Remove metadata older than slot_window"""
with self._lock:
threshold = (current_slot - self._slot_window) & 0xFFFF
expired_keys = [
k for k in self._buffer
if self._is_older_slot(k[0], threshold, current_slot)
]
for key in expired_keys:
meta = self._buffer.pop(key)
if expired_callback:
expired_callback(meta)
return len(expired_keys)
def _is_newer_slot(self, slot_a: int, slot_b: int) -> bool:
diff = (slot_a - slot_b) & 0xFFFF
return diff < 0x8000
def _is_older_slot(self, slot: int, threshold: int, current: int) -> bool:
dist_to_current = (current - slot) & 0xFFFF
dist_to_threshold = (current - threshold) & 0xFFFF
return dist_to_current > dist_to_threshold
@property
def size(self) -> int:
with self._lock:
return len(self._buffer)
@property
def latest_slot(self) -> int:
return self._latest_slot
# =============================================================================
# IQ Accumulator (Collects multiple IQ packets per slot)
# =============================================================================
@dataclass
class SlotAccumulator:
"""Accumulates IQ packets for a single slot"""
slot_num: int
packets: List[np.ndarray] = field(default_factory=list)
first_packet_time: float = field(default_factory=time.time)
packet_count: int = 0
def add_packet(self, samples: np.ndarray) -> None:
"""Add an IQ packet to this slot"""
self.packets.append(samples)
self.packet_count += 1
def get_combined_samples(self) -> np.ndarray:
"""Combine all packets into single array"""
if not self.packets:
return np.array([], dtype=np.complex64)
return np.concatenate(self.packets)
@property
def total_samples(self) -> int:
return sum(len(p) for p in self.packets)
@property
def age(self) -> float:
"""Time since first packet"""
return time.time() - self.first_packet_time
class IQAccumulatorBuffer:
"""
Accumulates IQ packets per slot.
Emits complete slot when:
- New slot arrives (previous slot is complete)
- Timeout reached
"""
def __init__(
self,
max_slots: int = 100,
slot_timeout: float = 0.1, # seconds
):
self._buffer: OrderedDict[int, SlotAccumulator] = OrderedDict()
self._max_slots = max_slots
self._slot_timeout = slot_timeout
self._lock = threading.Lock()
self._latest_slot = -1
def add_packet(self, slot_num: int, samples: np.ndarray) -> List[SlotAccumulator]:
"""
Add an IQ packet. Returns list of completed slots to emit.
A slot is considered complete when:
- A newer slot arrives (the older slot won't get more packets)
- The slot times out
"""
completed = []
with self._lock:
# Check for completed slots (older than current)
if self._latest_slot >= 0 and slot_num != self._latest_slot:
completed.extend(self._flush_older_slots(slot_num))
# Add packet to accumulator
if slot_num not in self._buffer:
self._buffer[slot_num] = SlotAccumulator(slot_num=slot_num)
self._buffer[slot_num].add_packet(samples)
# Update latest slot
if self._is_newer_slot(slot_num, self._latest_slot):
self._latest_slot = slot_num
# Check for timed-out slots
completed.extend(self._flush_timed_out())
# Limit buffer size
while len(self._buffer) > self._max_slots:
oldest_slot = next(iter(self._buffer))
completed.append(self._buffer.pop(oldest_slot))
return completed
def _flush_older_slots(self, current_slot: int) -> List[SlotAccumulator]:
"""Flush slots older than current"""
completed = []
slots_to_remove = []
for slot_num, accumulator in self._buffer.items():
if self._is_older_slot(slot_num, current_slot):
slots_to_remove.append(slot_num)
for slot_num in slots_to_remove:
completed.append(self._buffer.pop(slot_num))
return completed
def _flush_timed_out(self) -> List[SlotAccumulator]:
"""Flush slots that have timed out"""
completed = []
slots_to_remove = []
for slot_num, accumulator in self._buffer.items():
if accumulator.age > self._slot_timeout:
slots_to_remove.append(slot_num)
for slot_num in slots_to_remove:
completed.append(self._buffer.pop(slot_num))
return completed
def flush_all(self) -> List[SlotAccumulator]:
"""Flush all remaining slots"""
with self._lock:
completed = list(self._buffer.values())
self._buffer.clear()
return completed
def _is_newer_slot(self, slot_a: int, slot_b: int) -> bool:
if slot_b < 0:
return True
diff = (slot_a - slot_b) & 0xFFFF
return diff < 0x8000
def _is_older_slot(self, slot_a: int, slot_b: int) -> bool:
diff = (slot_b - slot_a) & 0xFFFF
return diff < 0x8000 and diff > 0
@property
def size(self) -> int:
with self._lock:
return len(self._buffer)
# =============================================================================
# Main Interface Class
# =============================================================================
class IQMetadataInterface:
"""
IQ-Prioritized interface with packet accumulation.
Handles multiple IQ packets per slot by accumulating them.
Correlates with buffered metadata when slot is complete.
"""
def __init__(
self,
iq_port: int = 5588,
metadata_port: int = 5589,
bind_address: str = "127.0.0.1",
metadata_buffer_size: int = 1000,
metadata_slot_window: int = 500,
iq_accumulator_size: int = 100,
iq_slot_timeout: float = 0.05, # 50ms timeout per slot
output_queue_size: int = 1000,
iq_socket_buffer: int = 8 * 1024 * 1024,
metadata_socket_buffer: int = 1024 * 1024,
cleanup_interval: int = 100,
):
self.iq_port = iq_port
self.metadata_port = metadata_port
self.bind_address = bind_address
self._cleanup_interval = cleanup_interval
# Metadata buffer (stores metadata until IQ slot completes)
self._metadata_buffer = MetadataBuffer(
max_slots=metadata_buffer_size, slot_window=metadata_slot_window
)
# IQ accumulator (collects multiple packets per slot)
self._iq_accumulator = IQAccumulatorBuffer(
max_slots=iq_accumulator_size, slot_timeout=iq_slot_timeout
)
# Output queue
self._output_queue: queue.Queue[CorrelatedData] = queue.Queue(
maxsize=output_queue_size
)
# Callbacks
self._correlated_callbacks: List[Callable[[CorrelatedData], None]] = []
self._iq_callbacks: List[Callable[[IQSamples], None]] = []
self._metadata_callbacks: List[Callable[[Metadata], None]] = []
# Thread control
self._running = False
self._threads: List[threading.Thread] = []
self._stop_event = threading.Event()
# Statistics
self.stats = InterfaceStats()
# Socket configuration
self._iq_socket_buffer = iq_socket_buffer
self._metadata_socket_buffer = metadata_socket_buffer
# Tracking
self._latest_iq_slot = 0
# For tracking per-capture stats
self._capture_stats_start: Optional[InterfaceStats] = None
logger.info(
f"IQMetadataInterface initialized (IQ:{iq_port}, Meta:{metadata_port})"
)
logger.info(f" Metadata buffer: {metadata_buffer_size} slots")
logger.info(f" IQ accumulator: timeout={iq_slot_timeout*1000:.0f}ms")
# -------------------------------------------------------------------------
# Callback Registration
# -------------------------------------------------------------------------
def on_correlated_data(
self, callback: Callable[[CorrelatedData], None]
) -> Callable:
self._correlated_callbacks.append(callback)
return callback
def on_iq_data(self, callback: Callable[[IQSamples], None]) -> Callable:
self._iq_callbacks.append(callback)
return callback
def on_metadata(self, callback: Callable[[Metadata], None]) -> Callable:
self._metadata_callbacks.append(callback)
return callback
def remove_callback(self, callback: Callable) -> bool:
for cb_list in [
self._correlated_callbacks,
self._iq_callbacks,
self._metadata_callbacks,
]:
if callback in cb_list:
cb_list.remove(callback)
return True
return False
# -------------------------------------------------------------------------
# Queue-based Access
# -------------------------------------------------------------------------
def get(
self, block: bool = True, timeout: Optional[float] = None
) -> Optional[CorrelatedData]:
try:
return self._output_queue.get(block=block, timeout=timeout)
except queue.Empty:
return None
def get_nowait(self) -> Optional[CorrelatedData]:
return self.get(block=False)
def get_all(self, max_items: int = 100) -> List[CorrelatedData]:
items = []
while len(items) < max_items:
item = self.get_nowait()
if item is None:
break
items.append(item)
return items
@property
def queue_size(self) -> int:
return self._output_queue.qsize()
# -------------------------------------------------------------------------
# Iterator Interface
# -------------------------------------------------------------------------
def __iter__(self) -> Iterator[CorrelatedData]:
return self
def __next__(self) -> CorrelatedData:
if not self._running:
raise StopIteration
data = self.get(block=True, timeout=1.0)
if data is None:
if not self._running:
raise StopIteration
return self.__next__()
return data
# -------------------------------------------------------------------------
# Lifecycle Management
# -------------------------------------------------------------------------
def start(self) -> "IQMetadataInterface":
if self._running:
logger.warning("Interface already running")
return self
self._running = True
self._stop_event.clear()
self.stats = InterfaceStats()
self._threads = [
threading.Thread(
target=self._receive_metadata, name="MetadataRx", daemon=True
),
threading.Thread(target=self._receive_iq, name="IQRx", daemon=True),
]
for t in self._threads:
t.start()
logger.info(f"Started thread: {t.name}")
logger.info("IQMetadataInterface started (IQ-prioritized with accumulation)")
return self
def stop(self) -> None:
if not self._running:
return
self._running = False
self._stop_event.set()
# Flush remaining accumulated IQ
remaining = self._iq_accumulator.flush_all()
for slot_acc in remaining:
self._process_completed_slot(slot_acc)
for t in self._threads:
t.join(timeout=2.0)
self._threads.clear()
logger.info("IQMetadataInterface stopped")
def __enter__(self) -> "IQMetadataInterface":
return self.start()
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.stop()
@property
def is_running(self) -> bool:
return self._running
# -------------------------------------------------------------------------
# Internal: Packet Parsing
# -------------------------------------------------------------------------
def _parse_metadata_packet(self, data: bytes) -> Optional[Metadata]:
# Packet layout: [slot_num 2B][dl_flag 1B][rb_bitmap NB]
if len(data) < 3:
return None
slot_num = struct.unpack(">H", data[0:2])[0]
dl_flag = bool(data[2])
rb_bitmap = data[3:]
return Metadata(slot_num=slot_num, rb_bitmap=rb_bitmap, dl_flag=dl_flag)
def _parse_iq_packet(self, data: bytes) -> Optional[tuple]:
"""Returns (slot_num, samples) tuple"""
if len(data) < 10:
return None
slot_num = struct.unpack(">H", data[0:2])[0]
samples = np.frombuffer(data[2:], dtype=np.float32).view(np.complex64)
return (slot_num, samples)
# -------------------------------------------------------------------------
# Internal: Slot Processing
# -------------------------------------------------------------------------
def _process_completed_slot(self, slot_acc: SlotAccumulator) -> None:
"""Process a completed slot (all IQ packets received)"""
slot_num = slot_acc.slot_num
# Remove ALL metadata entries for this slot (1 for normal slots,
# 2 for flexible slots that carry both DL and UL bitmaps).
metadatas = self._metadata_buffer.remove_all(slot_num)
if metadatas:
self.stats.metadata_buffer_hits += 1
self.stats.iq_packets_correlated += slot_acc.packet_count
combined_samples = slot_acc.get_combined_samples()
iq = IQSamples(slot_num=slot_num, samples=combined_samples)
for metadata in metadatas:
self.stats.slots_correlated += 1
correlated = CorrelatedData(slot_num=slot_num, metadata=metadata, iq=iq)
self._emit_correlated(correlated)
else:
# No metadata found
self.stats.metadata_buffer_misses += 1
self.stats.slots_without_metadata += 1
logger.debug(
f"Slot {slot_num}: no metadata found ({slot_acc.packet_count} IQ packets)"
)
def _emit_correlated(self, data: CorrelatedData) -> None:
"""Emit correlated data to callbacks and queue"""
try:
self._output_queue.put_nowait(data)
except queue.Full:
try:
self._output_queue.get_nowait()
self._output_queue.put_nowait(data)
except queue.Empty:
pass
for callback in self._correlated_callbacks:
try:
callback(data)
self.stats.callbacks_invoked += 1
except Exception as e:
self.stats.callback_errors += 1
logger.error(f"Callback error: {e}")
def _on_metadata_expired(self, metadata: Metadata) -> None:
self.stats.metadata_expired += 1
# -------------------------------------------------------------------------
# Internal: Receive Threads
# -------------------------------------------------------------------------
def _flush_socket(self, sock: socket.socket) -> int:
sock.setblocking(False)
count = 0
try:
while True:
sock.recv(65535)
count += 1
except BlockingIOError:
pass
sock.setblocking(True)
sock.settimeout(1.0)
return count
def _receive_metadata(self) -> None:
"""Thread: receive metadata and store in buffer"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_RCVBUF, self._metadata_socket_buffer
)
sock.bind((self.bind_address, self.metadata_port))
flushed = self._flush_socket(sock)
if flushed > 0:
logger.debug(f"Flushed {flushed} old metadata packets")
while not self._stop_event.is_set():
try:
data, _ = sock.recvfrom(65535)
if len(data) == 0:
continue
metadata = self._parse_metadata_packet(data)
if metadata is None:
continue
self.stats.metadata_received += 1
self.stats.metadata_bytes += len(data)
for callback in self._metadata_callbacks:
try:
callback(metadata)
except Exception as e:
logger.error(f"Metadata callback error: {e}")
# Store in buffer
self._metadata_buffer.store(metadata)
except socket.timeout:
continue
except Exception as e:
if self._running:
logger.error(f"Metadata receive error: {e}")
sock.close()
def _receive_iq(self) -> None:
"""Thread: receive IQ packets, accumulate, correlate when slot complete"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._iq_socket_buffer)
sock.bind((self.bind_address, self.iq_port))
flushed = self._flush_socket(sock)
if flushed > 0:
logger.debug(f"Flushed {flushed} old IQ packets")
packet_count = 0
while not self._stop_event.is_set():
try:
data, _ = sock.recvfrom(65535)
if len(data) == 0:
continue
parsed = self._parse_iq_packet(data)
if parsed is None:
continue
slot_num, samples = parsed
# Update stats
self.stats.iq_packets_received += 1
self.stats.iq_samples_received += len(samples)
self.stats.iq_bytes += len(data)
self._latest_iq_slot = slot_num
packet_count += 1
# Raw IQ callback (per packet)
for callback in self._iq_callbacks:
try:
iq = IQSamples(slot_num=slot_num, samples=samples)
callback(iq)
except Exception as e:
logger.error(f"IQ callback error: {e}")
# Add to accumulator - returns completed slots
completed_slots = self._iq_accumulator.add_packet(slot_num, samples)
# Process any completed slots
for slot_acc in completed_slots:
self._process_completed_slot(slot_acc)
# Periodic cleanup
if packet_count % self._cleanup_interval == 0:
_ = self._metadata_buffer.cleanup_old(
slot_num, expired_callback=self._on_metadata_expired
)
except socket.timeout:
continue
except Exception as e:
if self._running:
logger.error(f"IQ receive error: {e}")
sock.close()
# -------------------------------------------------------------------------
# Utility Methods
# -------------------------------------------------------------------------
def reset_stats(self) -> None:
"""Reset all statistics to zero"""
self.stats.reset()
self._capture_stats_start = None
logger.info("Statistics reset")
def get_stats(self) -> InterfaceStats:
"""Get current aggregated statistics"""
return self.stats
def start_stats_capture(self) -> None:
"""Mark the start of a stats capture period"""
self._capture_stats_start = self.stats.snapshot()
def get_capture_stats(self) -> Dict[str, Any]:
"""
Get statistics for current capture period only.
Returns stats since last start_stats_capture() call.
"""
if self._capture_stats_start is None:
# No capture started, return current totals
return self.stats.to_dict()
start = self._capture_stats_start
current = self.stats
# Calculate deltas
runtime = current.runtime - (start.start_time - current.start_time)
# capture_start = time.time() - runtime
metadata_received = current.metadata_received - start.metadata_received
iq_packets = current.iq_packets_received - start.iq_packets_received
iq_samples = current.iq_samples_received - start.iq_samples_received
iq_bytes = current.iq_bytes - start.iq_bytes
slots_correlated = current.slots_correlated - start.slots_correlated
iq_packets_correlated = (
current.iq_packets_correlated - start.iq_packets_correlated
)
slots_without_meta = (
current.slots_without_metadata - start.slots_without_metadata
)
metadata_expired = current.metadata_expired - start.metadata_expired
# Calculate rates
duration = time.time() - (
self._capture_stats_start.start_time
+ (time.time() - current.start_time - current.runtime)
)
duration = max(duration, 0.001)
# Simpler duration calculation
duration = runtime if runtime > 0 else 0.001
total_slots = slots_correlated + slots_without_meta
correlation_rate = (
(slots_correlated / total_slots * 100) if total_slots > 0 else 0.0
)
avg_packets = (
(iq_packets_correlated / slots_correlated) if slots_correlated > 0 else 0.0
)
return {
"capture_duration": duration,
"metadata_received": metadata_received,
"metadata_rate": metadata_received / duration,
"iq_packets_received": iq_packets,
"iq_packet_rate": iq_packets / duration,
"iq_samples_received": iq_samples,
"iq_bytes": iq_bytes,
"iq_bandwidth_mbps": (iq_bytes / duration) / 1e6,
"slots_correlated": slots_correlated,
"slot_rate": slots_correlated / duration,
"correlation_rate_pct": correlation_rate,
"avg_packets_per_slot": avg_packets,
"slots_without_metadata": slots_without_meta,
"metadata_expired": metadata_expired,
}
def print_stats(self, capture_only: bool = False) -> None:
"""
Print statistics.
Args:
capture_only: If True, print only current capture stats
"""
if capture_only and self._capture_stats_start is not None:
stats = self.get_capture_stats()
print("\n=== Capture Stats ===")
else:
stats = self.stats.to_dict()
print("\n=== Aggregated Stats ===")
print(
f"Duration: {stats.get('capture_duration', stats.get('runtime_s', 0)):.1f}s"
)
# print(f"Metadata received: {stats['metadata_received']}")
# print(f"IQ packets received: {stats['iq_packets_received']}")
# print(f"Slots correlated: {stats['slots_correlated']}")
print(f"Correlation rate: {stats['correlation_rate_pct']:.1f}%")
# print(f"Avg packets/slot: {stats['avg_packets_per_slot']:.1f}")
print(f"Slots without meta: {stats['slots_without_metadata']}")
# print(f"Metadata expired: {stats['metadata_expired']}")
if not capture_only:
print(f"Metadata buffer: {self._metadata_buffer.size} slots")
print(f"IQ accumulator: {self._iq_accumulator.size} slots pending")
@property
def metadata_buffer_size(self) -> int:
return self._metadata_buffer.size
@property
def latest_iq_slot(self) -> int:
return self._latest_iq_slot
@property
def latest_metadata_slot(self) -> int:
return self._metadata_buffer.latest_slot
# =============================================================================
# Convenience Functions
# =============================================================================
def create_interface(
iq_port: int = 5588, metadata_port: int = 5589, **kwargs
) -> IQMetadataInterface:
return IQMetadataInterface(iq_port=iq_port, metadata_port=metadata_port, **kwargs)
# =============================================================================
# Main
# =============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="IQ Metadata Interface Test")
parser.add_argument("--iq-port", type=int, default=5588)
parser.add_argument("--metadata-port", type=int, default=5589)
parser.add_argument("--stats-interval", type=float, default=2.0)
args = parser.parse_args()
print("=" * 60)
print(" IQ Metadata Interface (with Accumulation)")
print("=" * 60)
interface = IQMetadataInterface(
iq_port=args.iq_port, metadata_port=args.metadata_port
)
@interface.on_correlated_data
def handle_data(data: CorrelatedData):
print(
f"[SLOT {data.slot_num:5d}] "
f"IQ: {data.iq.num_samples:6d} samples | "
f"RBs: {data.metadata.num_allocated_rbs:3d} | "
f"Power: {data.iq.power_db:6.1f} dB"
)
interface.start()
try:
while True:
time.sleep(args.stats_interval)
print()
interface.print_stats()
except KeyboardInterrupt:
print("\nShutting down...")
interface.stop()