maxwelllink.sockets package¶
- class maxwelllink.sockets.AggregatedSocketHub[source]¶
Bases:
SocketHubEM-side hub that aggregates multiple molecule requests into one bridge link.
This class keeps the same public methods used by MaxwellLink solvers (
register_molecule_return_id,wait_until_bound,all_bound,step_barrier) while mapping many molecule IDs onto a smaller number of bridge connections.Molecules are assigned to a bridge group through
init_payload["aggregate_group"]. All molecules sharing the same group are sent together to oneLocalSocketHubBridge.- __init__(host=None, port=31415, timeout=60000.0, latency=0.01)[source]¶
Initialize the socket hub.
- Parameters:
host (str or None, default: None) – Host address for AF_INET sockets. Ignored when using a UNIX socket.
port (int or None, default: 31415) – TCP port for AF_INET sockets. Ignored for UNIX sockets.
unixsocket (str or None, default: None) – Path (or name under
/tmp/socketmxl_*) for a UNIX domain socket. When provided,hostandportare ignored.timeout (float, default: 60000.0) – Socket timeout (seconds) for client operations.
latency (float, default: 0.01) – Polling sleep (seconds) between hub sweeps; can be very small for local runs.
- add_bridge(local_unixsocket)[source]¶
Create, start, and return one hub-owned local UNIX-socket bridge.
This is the convenience entry point intended for minimal edits when migrating an existing single-layer
SocketHubscript to the new two-layer transport.- Parameters:
local_unixsocket (str)
- Return type:
- all_bound(molecule_ids, require_init=True)¶
Check if all given molecule IDs are bound (and optionally initialized).
- Parameters:
molecule_ids (iterable of int) – Molecule IDs to check.
require_init (bool, default: True) – Also require that clients completed INIT.
- Returns:
Trueif all are bound (and initialized if requested), elseFalse.- Return type:
bool
- graceful_shutdown(reason=None, wait=2.0)¶
Politely ask all connected drivers to exit and wait briefly for
BYE.- Parameters:
reason (str or None, optional) – Optional reason to log for shutdown.
wait (float, default: 2.0) – Seconds to wait for clean replies.
- init_remote_bridges(molecules, *, molecules_per_bridge, unix_prefix='bridge_', save_file='aggregation.json')[source]¶
Partition molecules across remote bridge groups and save a manifest.
This helper does not start any bridge threads locally. Instead it assigns
molecule.init_payload["aggregate_group"]for each molecule and writes one JSON manifest that bridge-node scripts can consume viarun_bridge_node().- Parameters:
molecules (molecule or iterable of molecules) – Molecules to distribute across remote bridges.
molecules_per_bridge (int) – Maximum number of molecules assigned to one bridge.
unix_prefix (str, default:
"bridge_") – Prefix used to generate downstream UNIX socket namesf"{unix_prefix}{idx}".save_file (str, default:
"aggregation.json") – Path where the bridge manifest should be written.
- Returns:
The generated bridge specifications in order.
- Return type:
list[RemoteBridgeSpec]
- register_molecule(molecule_id)¶
Reserve a slot for a given molecule ID (client may connect later).
- Parameters:
molecule_id (int) – Molecule ID to register.
- Raises:
ValueError – If the molecule ID is already registered.
- Return type:
None
- register_molecule_return_id()¶
Reserve a slot for a molecule and return an auto-assigned ID.
- Returns:
The assigned unique molecule ID.
- Return type:
int
- step_barrier(requests, timeout=None)[source]¶
Dispatch all requested fields group-by-group and collect grouped replies.
The caller-facing contract matches
SocketHub.step_barrier:responses[molid]contains{"amp": ndarray(3,), "extra": bytes}.- Parameters:
requests (Dict[int, dict])
timeout (float | None)
- Return type:
Dict[int, dict]
- stop()[source]¶
Stop the aggregate hub and clean up bridge groups coherently.
The base
SocketHub.stop()assumes one client per molecule, which is not true here. This override shuts down each bridge once and clears all molecule bindings associated with that bridge.
- wait_until_bound(init_payloads, require_init=True, timeout=None)[source]¶
Wait until all requested molecules are served by initialized bridges.
Molecules are grouped through
init_payload["aggregate_group"]and each group must be backed by exactly one connected bridge.- Parameters:
init_payloads (dict)
- class maxwelllink.sockets.LocalSocketHubBridge[source]¶
Bases:
objectBridge process/thread that fans out aggregate requests to a local SocketHub.
- Upstream:
one TCP connection to
AggregatedSocketHub- Downstream:
one ordinary
SocketHubusing either TCP or UNIX sockets, connected to many existing MaxwellLink socket drivers.
- __init__(*, group_id, upstream_host, upstream_port, timeout=60.0, latency=0.01, local_host='127.0.0.1', local_port=None, local_unixsocket=None)[source]¶
- Parameters:
group_id (str)
upstream_host (str)
upstream_port (int)
timeout (float)
latency (float)
local_host (str)
local_port (int | None)
local_unixsocket (str | None)
- property local_endpoint: dict¶
Return the downstream socket endpoint local drivers should connect to.
- class maxwelllink.sockets.RemoteBridgeSpec[source]¶
Bases:
objectOne remote aggregate bridge entry produced by
init_remote_bridges.- Parameters:
idx (int) – Zero-based bridge index used by
run_bridge_node().group_id (str) – Aggregate group identifier transmitted upstream.
unixsocket (str) – Downstream UNIX-socket address local drivers should connect to.
n_molecules (int) – Number of molecules assigned to this bridge.
- __init__(idx, group_id, unixsocket, n_molecules)¶
- Parameters:
idx (int)
group_id (str)
unixsocket (str)
n_molecules (int)
- Return type:
None
- classmethod from_dict(payload)[source]¶
Build one bridge specification from JSON-decoded manifest data.
- Parameters:
payload (Mapping)
- Return type:
- group_id: str¶
- idx: int¶
- n_molecules: int¶
- unixsocket: str¶
- class maxwelllink.sockets.SocketHub[source]¶
Bases:
objectSocket server coordinating multiple driver connections with an FDTD engine.
This server:
Accepts and tracks many driver connections.
Handles initialization handshakes, field dispatch, and result collection.
Provides a barrier-style step to send fields and receive source amplitudes from all registered molecules.
- __init__(host=None, port=31415, unixsocket=None, timeout=60000.0, latency=0.01)[source]¶
Initialize the socket hub.
- Parameters:
host (str or None, default: None) – Host address for AF_INET sockets. Ignored when using a UNIX socket.
port (int or None, default: 31415) – TCP port for AF_INET sockets. Ignored for UNIX sockets.
unixsocket (str or None, default: None) – Path (or name under
/tmp/socketmxl_*) for a UNIX domain socket. When provided,hostandportare ignored.timeout (float, default: 60000.0) – Socket timeout (seconds) for client operations.
latency (float, default: 0.01) – Polling sleep (seconds) between hub sweeps; can be very small for local runs.
- all_bound(molecule_ids, require_init=True)[source]¶
Check if all given molecule IDs are bound (and optionally initialized).
- Parameters:
molecule_ids (iterable of int) – Molecule IDs to check.
require_init (bool, default: True) – Also require that clients completed INIT.
- Returns:
Trueif all are bound (and initialized if requested), elseFalse.- Return type:
bool
- graceful_shutdown(reason=None, wait=2.0)[source]¶
Politely ask all connected drivers to exit and wait briefly for
BYE.- Parameters:
reason (str or None, optional) – Optional reason to log for shutdown.
wait (float, default: 2.0) – Seconds to wait for clean replies.
- register_molecule(molecule_id)[source]¶
Reserve a slot for a given molecule ID (client may connect later).
- Parameters:
molecule_id (int) – Molecule ID to register.
- Raises:
ValueError – If the molecule ID is already registered.
- Return type:
None
- register_molecule_return_id()[source]¶
Reserve a slot for a molecule and return an auto-assigned ID.
- Returns:
The assigned unique molecule ID.
- Return type:
int
- step_barrier(requests, timeout=None)[source]¶
Barrier step: dispatch fields and collect source amplitudes from all clients.
Coordinates sending fields, waiting for results, and jointly committing the results once every requested molecule is ready. A frozen barrier is reused if a disconnect occurs mid-step.
- Parameters:
requests (dict[int, dict]) – Mapping from molecule ID to request dict with keys: -
"efield_au": array-like(3,)field vector in a.u. -"meta": dict, optional metadata per send. -"init": dict, optional INIT payload for first bind.timeout (float, optional) – Maximum time (seconds) to wait for the barrier to complete. Defaults to the hub’s
timeoutsetting.
- Returns:
Mapping
molid -> {"amp": ndarray(3,), "extra": bytes}. Returns{}when paused, on abort, or if the barrier is incomplete.- Return type:
dict[int, dict]
- stop()[source]¶
Stop accepting new connections, request clients to exit, and close sockets.
Also removes the UNIX socket path if one was created.
- wait_until_bound(init_payloads, require_init=True, timeout=None)[source]¶
Block until all requested molecule IDs are bound (and optionally initialized).
- Parameters:
init_payloads (dict[int, dict]) – Mapping from molecule ID to INIT payload to use on bind.
require_init (bool, default: True) – Also require that clients completed INIT.
timeout (float or None, optional) – Maximum time to wait (seconds). Uses hub default if
None.
- Returns:
Trueif all requested IDs became bound within the time limit, elseFalse.- Return type:
bool
- maxwelllink.sockets.am_master()[source]¶
Return True if this process is the MPI master rank (rank 0), otherwise False.
Notes
Attempts to import
mpi4pyand queryCOMM_WORLD. If unavailable, returnsTrueby treating the single process as rank 0.
- maxwelllink.sockets.get_available_host_port(localhost=True, save_to_file=None)[source]¶
Ask the OS for an available localhost TCP port.
- Parameters:
localhost (bool, default: True) – If True, bind to the localhost interface (“127.0.0.1”). If False, bind to all interfaces (“0.0.0.0”).
save_to_file (str or None, default: None) – If provided, save the selected host and port to the given file with filename provided by save_to_file. The first line contains the host, and the second line contains the port.
- Returns:
(host, port)pair, e.g.,("127.0.0.1", 34567).- Return type:
tuple
- maxwelllink.sockets.mpi_bcast_from_master(value)[source]¶
Broadcast a Python value from the master rank to all ranks via MPI.
- Parameters:
value (any) – The value to broadcast.
- Returns:
The broadcast value (unchanged when MPI is unavailable).
- Return type:
any
- maxwelllink.sockets.run_bridge_node(info='aggregation.json', *, idx=0)[source]¶
Run one bridge node from a manifest written by
init_remote_bridges.- Parameters:
info (str or path-like, default:
"aggregation.json") – JSON manifest written byAggregatedSocketHub.init_remote_bridges().idx (int, default: 0) – Zero-based bridge index identifying which bridge entry in
infothis node should start.
- Return type:
None
Submodules¶
- maxwelllink.sockets.aggregated module
AggregatedBridgeAggregatedSocketHubAggregatedSocketHub.__init__()AggregatedSocketHub.add_bridge()AggregatedSocketHub.all_bound()AggregatedSocketHub.graceful_shutdown()AggregatedSocketHub.init_remote_bridges()AggregatedSocketHub.register_molecule()AggregatedSocketHub.register_molecule_return_id()AggregatedSocketHub.remote_bridge_infoAggregatedSocketHub.remote_bridgesAggregatedSocketHub.step_barrier()AggregatedSocketHub.stop()AggregatedSocketHub.wait_until_bound()
LocalSocketHubBridgeRemoteBridgeSpecmxl_bridge_main()run_bridge_node()
- maxwelllink.sockets.sockets module