maxwelllink.sockets.aggregated module¶
Two-layer socket aggregation for MaxwellLink.
This module adds an opt-in transport layer on top of the existing
SocketHub implementation without modifying the original hub logic.
The new design introduces two roles:
AggregatedSocketHub: an EM-side hub that keeps the public hub API expected by MaxwellLink solvers, but aggregates multiple molecule requests into one upstream connection per HPC node.LocalSocketHubBridge: a node-local bridge process/thread that talks toAggregatedSocketHubupstream while reusing an ordinary downstreamSocketHubto fan out work to multiple existing Python/socket-only drivers.
This preserves existing SocketHub behavior while enabling a two-layer
communication topology:
EM solver -> AggregatedSocketHub ==TCP==> LocalSocketHubBridge
-> local SocketHub ==TCP/UNIX==> many molecular drivers
- class maxwelllink.sockets.aggregated.AggregatedBridge[source]¶
Bases:
objectConvenience handle for one hub-owned local bridge.
Instances of this class are returned by
AggregatedSocketHub.add_bridge(). They provide a light wrapper aroundLocalSocketHubBridgeso existing input scripts only need to:create bridge handles from the hub,
attach molecules to a handle via
append(), andlaunch downstream drivers against
address.
- __init__(*, hub, group_id, bridge)[source]¶
- Parameters:
hub (AggregatedSocketHub)
group_id (str)
bridge (LocalSocketHubBridge)
- property address: str¶
Address string downstream UNIX-socket drivers should use.
- append(molecules)[source]¶
Attach one molecule or an iterable of molecules to this bridge group.
The helper only mutates
molecule.init_payload["aggregate_group"]and therefore works with existingmxl.Molecule/SocketMoleculeobjects without changing solver-side logic.- Return type:
None
- property local_endpoint: dict¶
Return the downstream endpoint mapping for driver launch code.
- stop(wait=2.0)[source]¶
Stop the underlying local bridge.
- Parameters:
wait (float)
- Return type:
None
- property unixsocket: str | None¶
Configured UNIX-socket driver address, if any.
- property unixsocket_path: str | None¶
Resolved filesystem path for the local UNIX socket.
- class maxwelllink.sockets.aggregated.AggregatedSocketHub[source]¶
Bases:
SocketHubEM-side hub that aggregates multiple molecule requests into one bridge link.
This class keeps the same public methods used by MaxwellLink solvers (
register_molecule_return_id,wait_until_bound,all_bound,step_barrier) while mapping many molecule IDs onto a smaller number of bridge connections.Molecules are assigned to a bridge group through
init_payload["aggregate_group"]. All molecules sharing the same group are sent together to oneLocalSocketHubBridge.- __init__(host=None, port=31415, timeout=60000.0, latency=0.01)[source]¶
Initialize the socket hub.
- Parameters:
host (str or None, default: None) – Host address for AF_INET sockets. Ignored when using a UNIX socket.
port (int or None, default: 31415) – TCP port for AF_INET sockets. Ignored for UNIX sockets.
unixsocket (str or None, default: None) – Path (or name under
/tmp/socketmxl_*) for a UNIX domain socket. When provided,hostandportare ignored.timeout (float, default: 60000.0) – Socket timeout (seconds) for client operations.
latency (float, default: 0.01) – Polling sleep (seconds) between hub sweeps; can be very small for local runs.
- add_bridge(local_unixsocket)[source]¶
Create, start, and return one hub-owned local UNIX-socket bridge.
This is the convenience entry point intended for minimal edits when migrating an existing single-layer
SocketHubscript to the new two-layer transport.- Parameters:
local_unixsocket (str)
- Return type:
- all_bound(molecule_ids, require_init=True)¶
Check if all given molecule IDs are bound (and optionally initialized).
- Parameters:
molecule_ids (iterable of int) – Molecule IDs to check.
require_init (bool, default: True) – Also require that clients completed INIT.
- Returns:
Trueif all are bound (and initialized if requested), elseFalse.- Return type:
bool
- graceful_shutdown(reason=None, wait=2.0)¶
Politely ask all connected drivers to exit and wait briefly for
BYE.- Parameters:
reason (str or None, optional) – Optional reason to log for shutdown.
wait (float, default: 2.0) – Seconds to wait for clean replies.
- init_remote_bridges(molecules, *, molecules_per_bridge, unix_prefix='bridge_', save_file='aggregation.json')[source]¶
Partition molecules across remote bridge groups and save a manifest.
This helper does not start any bridge threads locally. Instead it assigns
molecule.init_payload["aggregate_group"]for each molecule and writes one JSON manifest that bridge-node scripts can consume viarun_bridge_node().- Parameters:
molecules (molecule or iterable of molecules) – Molecules to distribute across remote bridges.
molecules_per_bridge (int) – Maximum number of molecules assigned to one bridge.
unix_prefix (str, default:
"bridge_") – Prefix used to generate downstream UNIX socket namesf"{unix_prefix}{idx}".save_file (str, default:
"aggregation.json") – Path where the bridge manifest should be written.
- Returns:
The generated bridge specifications in order.
- Return type:
list[RemoteBridgeSpec]
- register_molecule(molecule_id)¶
Reserve a slot for a given molecule ID (client may connect later).
- Parameters:
molecule_id (int) – Molecule ID to register.
- Raises:
ValueError – If the molecule ID is already registered.
- Return type:
None
- register_molecule_return_id()¶
Reserve a slot for a molecule and return an auto-assigned ID.
- Returns:
The assigned unique molecule ID.
- Return type:
int
- remote_bridge_info: dict | None¶
- remote_bridges: list[RemoteBridgeSpec]¶
- step_barrier(requests, timeout=None)[source]¶
Dispatch all requested fields group-by-group and collect grouped replies.
The caller-facing contract matches
SocketHub.step_barrier:responses[molid]contains{"amp": ndarray(3,), "extra": bytes}.- Parameters:
requests (Dict[int, dict])
timeout (float | None)
- Return type:
Dict[int, dict]
- stop()[source]¶
Stop the aggregate hub and clean up bridge groups coherently.
The base
SocketHub.stop()assumes one client per molecule, which is not true here. This override shuts down each bridge once and clears all molecule bindings associated with that bridge.
- wait_until_bound(init_payloads, require_init=True, timeout=None)[source]¶
Wait until all requested molecules are served by initialized bridges.
Molecules are grouped through
init_payload["aggregate_group"]and each group must be backed by exactly one connected bridge.- Parameters:
init_payloads (dict)
- class maxwelllink.sockets.aggregated.LocalSocketHubBridge[source]¶
Bases:
objectBridge process/thread that fans out aggregate requests to a local SocketHub.
- Upstream:
one TCP connection to
AggregatedSocketHub- Downstream:
one ordinary
SocketHubusing either TCP or UNIX sockets, connected to many existing MaxwellLink socket drivers.
- __init__(*, group_id, upstream_host, upstream_port, timeout=60.0, latency=0.01, local_host='127.0.0.1', local_port=None, local_unixsocket=None)[source]¶
- Parameters:
group_id (str)
upstream_host (str)
upstream_port (int)
timeout (float)
latency (float)
local_host (str)
local_port (int | None)
local_unixsocket (str | None)
- property local_endpoint: dict¶
Return the downstream socket endpoint local drivers should connect to.
- class maxwelllink.sockets.aggregated.RemoteBridgeSpec[source]¶
Bases:
objectOne remote aggregate bridge entry produced by
init_remote_bridges.- Parameters:
idx (int) – Zero-based bridge index used by
run_bridge_node().group_id (str) – Aggregate group identifier transmitted upstream.
unixsocket (str) – Downstream UNIX-socket address local drivers should connect to.
n_molecules (int) – Number of molecules assigned to this bridge.
- __init__(idx, group_id, unixsocket, n_molecules)¶
- Parameters:
idx (int)
group_id (str)
unixsocket (str)
n_molecules (int)
- Return type:
None
- classmethod from_dict(payload)[source]¶
Build one bridge specification from JSON-decoded manifest data.
- Parameters:
payload (Mapping)
- Return type:
- group_id: str¶
- idx: int¶
- n_molecules: int¶
- unixsocket: str¶
- maxwelllink.sockets.aggregated.mxl_bridge_main(argv=None)[source]¶
CLI entry point for running one aggregate bridge from a manifest.
Examples
mxl_bridge --info aggregation.json --idx 0- Parameters:
argv (list[str] | None)
- Return type:
int
- maxwelllink.sockets.aggregated.run_bridge_node(info='aggregation.json', *, idx=0)[source]¶
Run one bridge node from a manifest written by
init_remote_bridges.- Parameters:
info (str or path-like, default:
"aggregation.json") – JSON manifest written byAggregatedSocketHub.init_remote_bridges().idx (int, default: 0) – Zero-based bridge index identifying which bridge entry in
infothis node should start.
- Return type:
None