maxwelllink.sockets.aggregated module

Two-layer socket aggregation for MaxwellLink.

This module adds an opt-in transport layer on top of the existing SocketHub implementation without modifying the original hub logic.

The new design introduces two roles:

  • AggregatedSocketHub: an EM-side hub that keeps the public hub API expected by MaxwellLink solvers, but aggregates multiple molecule requests into one upstream connection per HPC node.

  • LocalSocketHubBridge: a node-local bridge process/thread that talks to AggregatedSocketHub upstream while reusing an ordinary downstream SocketHub to fan out work to multiple existing Python/socket-only drivers.

This preserves existing SocketHub behavior while enabling a two-layer communication topology:

EM solver -> AggregatedSocketHub ==TCP==> LocalSocketHubBridge
          -> local SocketHub ==TCP/UNIX==> many molecular drivers
class maxwelllink.sockets.aggregated.AggregatedBridge[source]

Bases: object

Convenience handle for one hub-owned local bridge.

Instances of this class are returned by AggregatedSocketHub.add_bridge(). They provide a light wrapper around LocalSocketHubBridge so existing input scripts only need to:

  1. create bridge handles from the hub,

  2. attach molecules to a handle via append(), and

  3. launch downstream drivers against address.

__init__(*, hub, group_id, bridge)[source]
Parameters:
property address: str

Address string downstream UNIX-socket drivers should use.

append(molecules)[source]

Attach one molecule or an iterable of molecules to this bridge group.

The helper only mutates molecule.init_payload["aggregate_group"] and therefore works with existing mxl.Molecule / SocketMolecule objects without changing solver-side logic.

Return type:

None

property local_endpoint: dict

Return the downstream endpoint mapping for driver launch code.

start()[source]

Start the underlying local bridge thread.

Return type:

Thread

stop(wait=2.0)[source]

Stop the underlying local bridge.

Parameters:

wait (float)

Return type:

None

property unixsocket: str | None

Configured UNIX-socket driver address, if any.

property unixsocket_path: str | None

Resolved filesystem path for the local UNIX socket.

class maxwelllink.sockets.aggregated.AggregatedSocketHub[source]

Bases: SocketHub

EM-side hub that aggregates multiple molecule requests into one bridge link.

This class keeps the same public methods used by MaxwellLink solvers (register_molecule_return_id, wait_until_bound, all_bound, step_barrier) while mapping many molecule IDs onto a smaller number of bridge connections.

Molecules are assigned to a bridge group through init_payload["aggregate_group"]. All molecules sharing the same group are sent together to one LocalSocketHubBridge.

__init__(host=None, port=31415, timeout=60000.0, latency=0.01)[source]

Initialize the socket hub.

Parameters:
  • host (str or None, default: None) – Host address for AF_INET sockets. Ignored when using a UNIX socket.

  • port (int or None, default: 31415) – TCP port for AF_INET sockets. Ignored for UNIX sockets.

  • unixsocket (str or None, default: None) – Path (or name under /tmp/socketmxl_*) for a UNIX domain socket. When provided, host and port are ignored.

  • timeout (float, default: 60000.0) – Socket timeout (seconds) for client operations.

  • latency (float, default: 0.01) – Polling sleep (seconds) between hub sweeps; can be very small for local runs.

add_bridge(local_unixsocket)[source]

Create, start, and return one hub-owned local UNIX-socket bridge.

This is the convenience entry point intended for minimal edits when migrating an existing single-layer SocketHub script to the new two-layer transport.

Parameters:

local_unixsocket (str)

Return type:

AggregatedBridge

all_bound(molecule_ids, require_init=True)

Check if all given molecule IDs are bound (and optionally initialized).

Parameters:
  • molecule_ids (iterable of int) – Molecule IDs to check.

  • require_init (bool, default: True) – Also require that clients completed INIT.

Returns:

True if all are bound (and initialized if requested), else False.

Return type:

bool

graceful_shutdown(reason=None, wait=2.0)

Politely ask all connected drivers to exit and wait briefly for BYE.

Parameters:
  • reason (str or None, optional) – Optional reason to log for shutdown.

  • wait (float, default: 2.0) – Seconds to wait for clean replies.

init_remote_bridges(molecules, *, molecules_per_bridge, unix_prefix='bridge_', save_file='aggregation.json')[source]

Partition molecules across remote bridge groups and save a manifest.

This helper does not start any bridge threads locally. Instead it assigns molecule.init_payload["aggregate_group"] for each molecule and writes one JSON manifest that bridge-node scripts can consume via run_bridge_node().

Parameters:
  • molecules (molecule or iterable of molecules) – Molecules to distribute across remote bridges.

  • molecules_per_bridge (int) – Maximum number of molecules assigned to one bridge.

  • unix_prefix (str, default: "bridge_") – Prefix used to generate downstream UNIX socket names f"{unix_prefix}{idx}".

  • save_file (str, default: "aggregation.json") – Path where the bridge manifest should be written.

Returns:

The generated bridge specifications in order.

Return type:

list[RemoteBridgeSpec]

register_molecule(molecule_id)

Reserve a slot for a given molecule ID (client may connect later).

Parameters:

molecule_id (int) – Molecule ID to register.

Raises:

ValueError – If the molecule ID is already registered.

Return type:

None

register_molecule_return_id()

Reserve a slot for a molecule and return an auto-assigned ID.

Returns:

The assigned unique molecule ID.

Return type:

int

remote_bridge_info: dict | None
remote_bridges: list[RemoteBridgeSpec]
step_barrier(requests, timeout=None)[source]

Dispatch all requested fields group-by-group and collect grouped replies.

The caller-facing contract matches SocketHub.step_barrier: responses[molid] contains {"amp": ndarray(3,), "extra": bytes}.

Parameters:
  • requests (Dict[int, dict])

  • timeout (float | None)

Return type:

Dict[int, dict]

stop()[source]

Stop the aggregate hub and clean up bridge groups coherently.

The base SocketHub.stop() assumes one client per molecule, which is not true here. This override shuts down each bridge once and clears all molecule bindings associated with that bridge.

wait_until_bound(init_payloads, require_init=True, timeout=None)[source]

Wait until all requested molecules are served by initialized bridges.

Molecules are grouped through init_payload["aggregate_group"] and each group must be backed by exactly one connected bridge.

Parameters:

init_payloads (dict)

class maxwelllink.sockets.aggregated.LocalSocketHubBridge[source]

Bases: object

Bridge process/thread that fans out aggregate requests to a local SocketHub.

Upstream:

one TCP connection to AggregatedSocketHub

Downstream:

one ordinary SocketHub using either TCP or UNIX sockets, connected to many existing MaxwellLink socket drivers.

__init__(*, group_id, upstream_host, upstream_port, timeout=60.0, latency=0.01, local_host='127.0.0.1', local_port=None, local_unixsocket=None)[source]
Parameters:
  • group_id (str)

  • upstream_host (str)

  • upstream_port (int)

  • timeout (float)

  • latency (float)

  • local_host (str)

  • local_port (int | None)

  • local_unixsocket (str | None)

property local_endpoint: dict

Return the downstream socket endpoint local drivers should connect to.

run()[source]

Run the bridge loop until the hub sends STOP or disconnects.

Return type:

None

start()[source]

Start the bridge loop in a daemon thread and return the thread handle.

Return type:

Thread

stop(wait=2.0)[source]

Stop the bridge loop and close the downstream local hub.

Parameters:

wait (float)

Return type:

None

class maxwelllink.sockets.aggregated.RemoteBridgeSpec[source]

Bases: object

One remote aggregate bridge entry produced by init_remote_bridges.

Parameters:
  • idx (int) – Zero-based bridge index used by run_bridge_node().

  • group_id (str) – Aggregate group identifier transmitted upstream.

  • unixsocket (str) – Downstream UNIX-socket address local drivers should connect to.

  • n_molecules (int) – Number of molecules assigned to this bridge.

__init__(idx, group_id, unixsocket, n_molecules)
Parameters:
  • idx (int)

  • group_id (str)

  • unixsocket (str)

  • n_molecules (int)

Return type:

None

classmethod from_dict(payload)[source]

Build one bridge specification from JSON-decoded manifest data.

Parameters:

payload (Mapping)

Return type:

RemoteBridgeSpec

group_id: str
idx: int
n_molecules: int
to_dict()[source]

Return a JSON-serializable bridge specification mapping.

Return type:

dict

unixsocket: str
maxwelllink.sockets.aggregated.mxl_bridge_main(argv=None)[source]

CLI entry point for running one aggregate bridge from a manifest.

Examples

mxl_bridge --info aggregation.json --idx 0

Parameters:

argv (list[str] | None)

Return type:

int

maxwelllink.sockets.aggregated.run_bridge_node(info='aggregation.json', *, idx=0)[source]

Run one bridge node from a manifest written by init_remote_bridges.

Parameters:
  • info (str or path-like, default: "aggregation.json") – JSON manifest written by AggregatedSocketHub.init_remote_bridges().

  • idx (int, default: 0) – Zero-based bridge index identifying which bridge entry in info this node should start.

Return type:

None