diff --git a/scripts/iqzmq.py b/scripts/iqzmq.py new file mode 100644 index 0000000..0c79b08 --- /dev/null +++ b/scripts/iqzmq.py @@ -0,0 +1,90 @@ +import socket +import struct +import zmq +import numpy as np +import argparse +import time + +def send_0mq(args): + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_socket.bind(('127.0.0.1', 5588)) + + # ZMQ setup with high-performance options + context = zmq.Context() + zmq_socket = context.socket(zmq.PUB) + zmq_socket.setsockopt(zmq.SNDHWM, 0) # Remove send buffer limit + zmq_socket.setsockopt(zmq.LINGER, 0) # Don't wait on close + zmq_socket.bind(f"tcp://127.0.0.1:{args.publisher}") + + # Buffer for storing received samples + total_samples = args.n_vectors * args.vector_length + complex_values = np.empty(total_samples, dtype=np.complex64) + + # Buffer for handling partial packets + leftover = None + + start_time = time.perf_counter() + while True: + received = 0 + + # Process leftover data from previous iteration + if leftover is not None: + take = min(len(leftover), total_samples) + complex_values[:take] = leftover[:take] + received = take + + # Update leftover + if take < len(leftover): + leftover = leftover[take:] + else: + leftover = None + + # Receive new packets + while received < total_samples: + try: + # Calculate how much more we need + bytes_needed = (total_samples - received) * 8 + data, _ = udp_socket.recvfrom(bytes_needed) + + # Convert to complex array + num_bytes = len(data) + num_samples = num_bytes // 8 + if num_samples == 0: + continue + + # Convert to complex64 array + new_samples = np.frombuffer(data, dtype=np.float32).view(np.complex64) + + # Calculate how many to copy + take = min(len(new_samples), total_samples - received) + + # Copy to main buffer + complex_values[received:received+take] = new_samples[:take] + received += take + + # Save any leftover samples + if take < len(new_samples): + if leftover is None: + leftover = new_samples[take:] + else: + leftover = np.concatenate((leftover, new_samples[take:])) + + except socket.error as e: + print(f"Socket error: {e}") + break + + # Send only if we have a full buffer + if received == total_samples: + zmq_socket.send(complex_values.tobytes(), copy=False) # Zero-copy send + + end_time = time.perf_counter() + print(f"Publish time: {end_time-start_time:.4f}s") + start_time = end_time + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--vector_length", "-v", type=int, default=1024) + parser.add_argument("--publisher", "-p", type=int, default=5556) + parser.add_argument("--n_vectors", "-n", type=int, default=512) + args = parser.parse_args() + send_0mq(args)