Added file for sending samples out to web viewer
This commit is contained in:
parent
b890983ac4
commit
fdbd7a741e
90
scripts/iqzmq.py
Normal file
90
scripts/iqzmq.py
Normal file
|
|
@ -0,0 +1,90 @@
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
import zmq
|
||||||
|
import numpy as np
|
||||||
|
import argparse
|
||||||
|
import time
|
||||||
|
|
||||||
|
def send_0mq(args):
|
||||||
|
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
udp_socket.bind(('127.0.0.1', 5588))
|
||||||
|
|
||||||
|
# ZMQ setup with high-performance options
|
||||||
|
context = zmq.Context()
|
||||||
|
zmq_socket = context.socket(zmq.PUB)
|
||||||
|
zmq_socket.setsockopt(zmq.SNDHWM, 0) # Remove send buffer limit
|
||||||
|
zmq_socket.setsockopt(zmq.LINGER, 0) # Don't wait on close
|
||||||
|
zmq_socket.bind(f"tcp://127.0.0.1:{args.publisher}")
|
||||||
|
|
||||||
|
# Buffer for storing received samples
|
||||||
|
total_samples = args.n_vectors * args.vector_length
|
||||||
|
complex_values = np.empty(total_samples, dtype=np.complex64)
|
||||||
|
|
||||||
|
# Buffer for handling partial packets
|
||||||
|
leftover = None
|
||||||
|
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
while True:
|
||||||
|
received = 0
|
||||||
|
|
||||||
|
# Process leftover data from previous iteration
|
||||||
|
if leftover is not None:
|
||||||
|
take = min(len(leftover), total_samples)
|
||||||
|
complex_values[:take] = leftover[:take]
|
||||||
|
received = take
|
||||||
|
|
||||||
|
# Update leftover
|
||||||
|
if take < len(leftover):
|
||||||
|
leftover = leftover[take:]
|
||||||
|
else:
|
||||||
|
leftover = None
|
||||||
|
|
||||||
|
# Receive new packets
|
||||||
|
while received < total_samples:
|
||||||
|
try:
|
||||||
|
# Calculate how much more we need
|
||||||
|
bytes_needed = (total_samples - received) * 8
|
||||||
|
data, _ = udp_socket.recvfrom(bytes_needed)
|
||||||
|
|
||||||
|
# Convert to complex array
|
||||||
|
num_bytes = len(data)
|
||||||
|
num_samples = num_bytes // 8
|
||||||
|
if num_samples == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Convert to complex64 array
|
||||||
|
new_samples = np.frombuffer(data, dtype=np.float32).view(np.complex64)
|
||||||
|
|
||||||
|
# Calculate how many to copy
|
||||||
|
take = min(len(new_samples), total_samples - received)
|
||||||
|
|
||||||
|
# Copy to main buffer
|
||||||
|
complex_values[received:received+take] = new_samples[:take]
|
||||||
|
received += take
|
||||||
|
|
||||||
|
# Save any leftover samples
|
||||||
|
if take < len(new_samples):
|
||||||
|
if leftover is None:
|
||||||
|
leftover = new_samples[take:]
|
||||||
|
else:
|
||||||
|
leftover = np.concatenate((leftover, new_samples[take:]))
|
||||||
|
|
||||||
|
except socket.error as e:
|
||||||
|
print(f"Socket error: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Send only if we have a full buffer
|
||||||
|
if received == total_samples:
|
||||||
|
zmq_socket.send(complex_values.tobytes(), copy=False) # Zero-copy send
|
||||||
|
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
print(f"Publish time: {end_time-start_time:.4f}s")
|
||||||
|
start_time = end_time
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--vector_length", "-v", type=int, default=1024)
|
||||||
|
parser.add_argument("--publisher", "-p", type=int, default=5556)
|
||||||
|
parser.add_argument("--n_vectors", "-n", type=int, default=512)
|
||||||
|
args = parser.parse_args()
|
||||||
|
send_0mq(args)
|
||||||
Loading…
Reference in New Issue
Block a user