import argparse import socket import time import numpy as np import zmq def send_0mq(args): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_socket.bind(("127.0.0.1", 5588)) # ZMQ setup with high-performance options context = zmq.Context() zmq_socket = context.socket(zmq.PUB) zmq_socket.setsockopt(zmq.SNDHWM, 0) # Remove send buffer limit zmq_socket.setsockopt(zmq.LINGER, 0) # Don't wait on close zmq_socket.bind(f"tcp://127.0.0.1:{args.publisher}") # Buffer for storing received samples total_samples = args.n_vectors * args.vector_length complex_values = np.empty(total_samples, dtype=np.complex64) # Buffer for handling partial packets leftover = None start_time = time.perf_counter() while True: received = 0 # Process leftover data from previous iteration if leftover is not None: take = min(len(leftover), total_samples) complex_values[:take] = leftover[:take] received = take # Update leftover if take < len(leftover): leftover = leftover[take:] else: leftover = None # Receive new packets while received < total_samples: try: # Calculate how much more we need bytes_needed = (total_samples - received) * 8 data, _ = udp_socket.recvfrom(bytes_needed) # Convert to complex array num_bytes = len(data) num_samples = num_bytes // 8 if num_samples == 0: continue # Convert to complex64 array new_samples = np.frombuffer(data, dtype=np.float32).view(np.complex64) # Calculate how many to copy take = min(len(new_samples), total_samples - received) # Copy to main buffer complex_values[received: received + take] = new_samples[:take] received += take # Save any leftover samples if take < len(new_samples): if leftover is None: leftover = new_samples[take:] else: leftover = np.concatenate((leftover, new_samples[take:])) except socket.error as e: print(f"Socket error: {e}") break # Send only if we have a full buffer if received == total_samples: zmq_socket.send(complex_values.tobytes(), copy=False) # Zero-copy send end_time = time.perf_counter() print(f"Publish time: {end_time-start_time:.4f}s") start_time = end_time if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--vector_length", "-v", type=int, default=1024) parser.add_argument("--publisher", "-p", type=int, default=5556) parser.add_argument("--n_vectors", "-n", type=int, default=512) args = parser.parse_args() send_0mq(args)