maxwelllink.sockets package

class maxwelllink.sockets.AggregatedSocketHub[source]

Bases: SocketHub

EM-side hub that aggregates multiple molecule requests into one bridge link.

This class keeps the same public methods used by MaxwellLink solvers (register_molecule_return_id, wait_until_bound, all_bound, step_barrier) while mapping many molecule IDs onto a smaller number of bridge connections.

Molecules are assigned to a bridge group through init_payload["aggregate_group"]. All molecules sharing the same group are sent together to one LocalSocketHubBridge.

__init__(host=None, port=31415, timeout=60000.0, latency=0.01)[source]

Initialize the socket hub.

Parameters:
  • host (str or None, default: None) – Host address for AF_INET sockets. Ignored when using a UNIX socket.

  • port (int or None, default: 31415) – TCP port for AF_INET sockets. Ignored for UNIX sockets.

  • unixsocket (str or None, default: None) – Path (or name under /tmp/socketmxl_*) for a UNIX domain socket. When provided, host and port are ignored.

  • timeout (float, default: 60000.0) – Socket timeout (seconds) for client operations.

  • latency (float, default: 0.01) – Polling sleep (seconds) between hub sweeps; can be very small for local runs.

add_bridge(local_unixsocket)[source]

Create, start, and return one hub-owned local UNIX-socket bridge.

This is the convenience entry point intended for minimal edits when migrating an existing single-layer SocketHub script to the new two-layer transport.

Parameters:

local_unixsocket (str)

Return type:

AggregatedBridge

all_bound(molecule_ids, require_init=True)

Check if all given molecule IDs are bound (and optionally initialized).

Parameters:
  • molecule_ids (iterable of int) – Molecule IDs to check.

  • require_init (bool, default: True) – Also require that clients completed INIT.

Returns:

True if all are bound (and initialized if requested), else False.

Return type:

bool

graceful_shutdown(reason=None, wait=2.0)

Politely ask all connected drivers to exit and wait briefly for BYE.

Parameters:
  • reason (str or None, optional) – Optional reason to log for shutdown.

  • wait (float, default: 2.0) – Seconds to wait for clean replies.

init_remote_bridges(molecules, *, molecules_per_bridge, unix_prefix='bridge_', save_file='aggregation.json')[source]

Partition molecules across remote bridge groups and save a manifest.

This helper does not start any bridge threads locally. Instead it assigns molecule.init_payload["aggregate_group"] for each molecule and writes one JSON manifest that bridge-node scripts can consume via run_bridge_node().

Parameters:
  • molecules (molecule or iterable of molecules) – Molecules to distribute across remote bridges.

  • molecules_per_bridge (int) – Maximum number of molecules assigned to one bridge.

  • unix_prefix (str, default: "bridge_") – Prefix used to generate downstream UNIX socket names f"{unix_prefix}{idx}".

  • save_file (str, default: "aggregation.json") – Path where the bridge manifest should be written.

Returns:

The generated bridge specifications in order.

Return type:

list[RemoteBridgeSpec]

register_molecule(molecule_id)

Reserve a slot for a given molecule ID (client may connect later).

Parameters:

molecule_id (int) – Molecule ID to register.

Raises:

ValueError – If the molecule ID is already registered.

Return type:

None

register_molecule_return_id()

Reserve a slot for a molecule and return an auto-assigned ID.

Returns:

The assigned unique molecule ID.

Return type:

int

step_barrier(requests, timeout=None)[source]

Dispatch all requested fields group-by-group and collect grouped replies.

The caller-facing contract matches SocketHub.step_barrier: responses[molid] contains {"amp": ndarray(3,), "extra": bytes}.

Parameters:
  • requests (Dict[int, dict])

  • timeout (float | None)

Return type:

Dict[int, dict]

stop()[source]

Stop the aggregate hub and clean up bridge groups coherently.

The base SocketHub.stop() assumes one client per molecule, which is not true here. This override shuts down each bridge once and clears all molecule bindings associated with that bridge.

wait_until_bound(init_payloads, require_init=True, timeout=None)[source]

Wait until all requested molecules are served by initialized bridges.

Molecules are grouped through init_payload["aggregate_group"] and each group must be backed by exactly one connected bridge.

Parameters:

init_payloads (dict)

class maxwelllink.sockets.LocalSocketHubBridge[source]

Bases: object

Bridge process/thread that fans out aggregate requests to a local SocketHub.

Upstream:

one TCP connection to AggregatedSocketHub

Downstream:

one ordinary SocketHub using either TCP or UNIX sockets, connected to many existing MaxwellLink socket drivers.

__init__(*, group_id, upstream_host, upstream_port, timeout=60.0, latency=0.01, local_host='127.0.0.1', local_port=None, local_unixsocket=None)[source]
Parameters:
  • group_id (str)

  • upstream_host (str)

  • upstream_port (int)

  • timeout (float)

  • latency (float)

  • local_host (str)

  • local_port (int | None)

  • local_unixsocket (str | None)

property local_endpoint: dict

Return the downstream socket endpoint local drivers should connect to.

run()[source]

Run the bridge loop until the hub sends STOP or disconnects.

Return type:

None

start()[source]

Start the bridge loop in a daemon thread and return the thread handle.

Return type:

Thread

stop(wait=2.0)[source]

Stop the bridge loop and close the downstream local hub.

Parameters:

wait (float)

Return type:

None

class maxwelllink.sockets.RemoteBridgeSpec[source]

Bases: object

One remote aggregate bridge entry produced by init_remote_bridges.

Parameters:
  • idx (int) – Zero-based bridge index used by run_bridge_node().

  • group_id (str) – Aggregate group identifier transmitted upstream.

  • unixsocket (str) – Downstream UNIX-socket address local drivers should connect to.

  • n_molecules (int) – Number of molecules assigned to this bridge.

__init__(idx, group_id, unixsocket, n_molecules)
Parameters:
  • idx (int)

  • group_id (str)

  • unixsocket (str)

  • n_molecules (int)

Return type:

None

classmethod from_dict(payload)[source]

Build one bridge specification from JSON-decoded manifest data.

Parameters:

payload (Mapping)

Return type:

RemoteBridgeSpec

group_id: str
idx: int
n_molecules: int
to_dict()[source]

Return a JSON-serializable bridge specification mapping.

Return type:

dict

unixsocket: str
class maxwelllink.sockets.SocketHub[source]

Bases: object

Socket server coordinating multiple driver connections with an FDTD engine.

This server:

  • Accepts and tracks many driver connections.

  • Handles initialization handshakes, field dispatch, and result collection.

  • Provides a barrier-style step to send fields and receive source amplitudes from all registered molecules.

__init__(host=None, port=31415, unixsocket=None, timeout=60000.0, latency=0.01)[source]

Initialize the socket hub.

Parameters:
  • host (str or None, default: None) – Host address for AF_INET sockets. Ignored when using a UNIX socket.

  • port (int or None, default: 31415) – TCP port for AF_INET sockets. Ignored for UNIX sockets.

  • unixsocket (str or None, default: None) – Path (or name under /tmp/socketmxl_*) for a UNIX domain socket. When provided, host and port are ignored.

  • timeout (float, default: 60000.0) – Socket timeout (seconds) for client operations.

  • latency (float, default: 0.01) – Polling sleep (seconds) between hub sweeps; can be very small for local runs.

all_bound(molecule_ids, require_init=True)[source]

Check if all given molecule IDs are bound (and optionally initialized).

Parameters:
  • molecule_ids (iterable of int) – Molecule IDs to check.

  • require_init (bool, default: True) – Also require that clients completed INIT.

Returns:

True if all are bound (and initialized if requested), else False.

Return type:

bool

graceful_shutdown(reason=None, wait=2.0)[source]

Politely ask all connected drivers to exit and wait briefly for BYE.

Parameters:
  • reason (str or None, optional) – Optional reason to log for shutdown.

  • wait (float, default: 2.0) – Seconds to wait for clean replies.

register_molecule(molecule_id)[source]

Reserve a slot for a given molecule ID (client may connect later).

Parameters:

molecule_id (int) – Molecule ID to register.

Raises:

ValueError – If the molecule ID is already registered.

Return type:

None

register_molecule_return_id()[source]

Reserve a slot for a molecule and return an auto-assigned ID.

Returns:

The assigned unique molecule ID.

Return type:

int

step_barrier(requests, timeout=None)[source]

Barrier step: dispatch fields and collect source amplitudes from all clients.

Coordinates sending fields, waiting for results, and jointly committing the results once every requested molecule is ready. A frozen barrier is reused if a disconnect occurs mid-step.

Parameters:
  • requests (dict[int, dict]) – Mapping from molecule ID to request dict with keys: - "efield_au" : array-like (3,) field vector in a.u. - "meta" : dict, optional metadata per send. - "init" : dict, optional INIT payload for first bind.

  • timeout (float, optional) – Maximum time (seconds) to wait for the barrier to complete. Defaults to the hub’s timeout setting.

Returns:

Mapping molid -> {"amp": ndarray(3,), "extra": bytes}. Returns {} when paused, on abort, or if the barrier is incomplete.

Return type:

dict[int, dict]

stop()[source]

Stop accepting new connections, request clients to exit, and close sockets.

Also removes the UNIX socket path if one was created.

wait_until_bound(init_payloads, require_init=True, timeout=None)[source]

Block until all requested molecule IDs are bound (and optionally initialized).

Parameters:
  • init_payloads (dict[int, dict]) – Mapping from molecule ID to INIT payload to use on bind.

  • require_init (bool, default: True) – Also require that clients completed INIT.

  • timeout (float or None, optional) – Maximum time to wait (seconds). Uses hub default if None.

Returns:

True if all requested IDs became bound within the time limit, else False.

Return type:

bool

maxwelllink.sockets.am_master()[source]

Return True if this process is the MPI master rank (rank 0), otherwise False.

Notes

Attempts to import mpi4py and query COMM_WORLD. If unavailable, returns True by treating the single process as rank 0.

maxwelllink.sockets.get_available_host_port(localhost=True, save_to_file=None)[source]

Ask the OS for an available localhost TCP port.

Parameters:
  • localhost (bool, default: True) – If True, bind to the localhost interface (“127.0.0.1”). If False, bind to all interfaces (“0.0.0.0”).

  • save_to_file (str or None, default: None) – If provided, save the selected host and port to the given file with filename provided by save_to_file. The first line contains the host, and the second line contains the port.

Returns:

(host, port) pair, e.g., ("127.0.0.1", 34567).

Return type:

tuple

maxwelllink.sockets.mpi_bcast_from_master(value)[source]

Broadcast a Python value from the master rank to all ranks via MPI.

Parameters:

value (any) – The value to broadcast.

Returns:

The broadcast value (unchanged when MPI is unavailable).

Return type:

any

maxwelllink.sockets.run_bridge_node(info='aggregation.json', *, idx=0)[source]

Run one bridge node from a manifest written by init_remote_bridges.

Parameters:
  • info (str or path-like, default: "aggregation.json") – JSON manifest written by AggregatedSocketHub.init_remote_bridges().

  • idx (int, default: 0) – Zero-based bridge index identifying which bridge entry in info this node should start.

Return type:

None

Submodules