"""
Multimode cavity electrodynamics for MaxwellLink.
This module defines a lightweight cavity simulator that treats the EM field as
multiple damped harmonic oscillators coupled to MaxwellLink molecules. The simulation
runs entirely in atomic units and can operate with both socket-connected and
embedded (non-socket) molecular drivers.
"""
from __future__ import annotations
import json
import os, shutil
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Union
import time
import numpy as np
from ..molecule import Molecule
from ..sockets import SocketHub, am_master
from ..units import FS_TO_AU, AU_TO_CM_INV
from .dummy_em import DummyEMUnits, MoleculeDummyWrapper, DummyEMSimulation
from ..tools.harmonic_oscillator_helper import DummyInitializer, DummyThermostat
[docs]
class MultiModeUnits(DummyEMUnits):
"""
EM units for multimode cavity simulations (1:1 to atomic units).
"""
[docs]
def __init__(self):
super().__init__()
[docs]
class MoleculeMultiModeWrapper(MoleculeDummyWrapper):
"""
Wrapper that adapts a ``Molecule`` to MultiModeSimulation, handling units, sources, and IO.
"""
[docs]
def __init__(self, molecule: Molecule, dt_au: float):
"""
Initialize the MultiMode molecule wrapper.
Parameters
----------
molecule : Molecule
The molecule to wrap.
dt_au : float
Time step in atomic units.
"""
super().__init__(molecule=molecule)
self.dt_au = float(dt_au)
if self.dt_au <= 0.0:
raise ValueError("dt_au must be positive.")
# retrieve molecule settings from the wrapped Molecule
self.mode = self.m.mode
self.hub: Optional[SocketHub] = getattr(self.m, "hub", None)
self.molecule_id: int = getattr(self.m, "molecule_id", -1)
self.init_payload: Dict = dict(self.m.init_payload)
self.last_amp: np.ndarray = np.zeros(3, dtype=float)
self.time_units_fs = 1.0 / FS_TO_AU # so that dt_em == dt_au
# to rescale dipoles if needed
self.rescaling_factor = self.m.rescaling_factor
# Refresh time-units and dt to keep init payloads consistent
self.m._refresh_time_units(self.time_units_fs)
self.m._refresh_time_step(self.dt_au)
self.init_payload["dt_au"] = self.dt_au
self.additional_data_history = self.m.additional_data_history
if self.mode == "non-socket":
if self.molecule_id < 0:
self.molecule_id = 0 # will be overwritten by simulation
self.m.molecule_id = self.molecule_id
[docs]
def append_additional_data(self, time_au: float):
"""
Store additional molecular data supplied by non-socket drivers.
Parameters
----------
time_au : float
Current simulation time in atomic units.
"""
extra = {}
if hasattr(self, "d_f"):
try:
extra = dict(self.d_f.append_additional_data() or {})
except Exception:
extra = {}
if "time_au" not in extra:
extra["time_au"] = time_au
if extra:
self.additional_data_history.append(extra)
[docs]
class FabryPerotCavity:
r"""
Damped harmonic oscillators coupled to MaxwellLink molecules.
Under the dipole gauge, the total light-matter Hamiltonian is
.. math::
H = H_{mol} + \frac{1}{2} p_{\rm c}^2 + \frac{1}{2} \sum_{k\lambda}\omega_{k\lambda, \rm c}^2 (q_{k\lambda, \rm c} + \frac{\varepsilon_{k\lambda}}{\omega_{k\lambda,\rm c}^2} \sum_i \mu_i \cdot f_{k\lambda}(r_i))^2
The cavity mode (classical harmonic oscillator) obeys
.. math::
\ddot{q}_{k\lambda, \rm c} = -\omega_{k\lambda, \rm c}^{2}\, q_{k\lambda, \rm c} \; - \; \varepsilon_{k\lambda} \sum_i \mu_i \cdot f_{k\lambda}(r_i) \;-\; \gamma_{k\lambda, \rm c} \, p_{k\lambda, \rm c} \;+\; D_{k\lambda}(t),
where the effective electric field of this cavity mode is
.. math::
E(t) = -\sum_{k\lambda} \varepsilon_{k\lambda} q_{k\lambda, \rm c}(t) - \frac{\varepsilon_{k\lambda}^2}{\omega_{k\lambda, \rm c}^2} \sum_i \mu_i(t) \cdot f_{k\lambda}(r_i),
where :math:`\varepsilon_{k\lambda}` is effective coupling strength for different photon modes, :math:`f_{k\lambda}(r_i)` is the cavity mode function evaluated at the position of the r_i. The second term in the electric field accounts for the dipole self-energy term if enabled.
All quantities are in atomic units.
"""
[docs]
def __init__(
self,
frequency_au: float = None,
coupling_strength: float = 1.0,
coupling_axis: str = "xy",
x_grid_1d: Optional[list] = None,
y_grid_1d: Optional[list] = None,
n_grid_x: int = None,
n_grid_y: int = None,
n_repeat_x: int = 1,
n_repeat_y: int = 1,
delta_omega_x_au: float = None,
delta_omega_y_au: float = None,
n_mode_x: int = 1,
n_mode_y: int = 1,
abc_cutoff: float = 0.0,
):
r"""
Parameters
----------
frequency_au : float
Cavity angular frequency :math:`\omega_{\rm c}` (a.u.).
coupling_strength : float, default: 1.0
Prefactor :math:`\varepsilon`.
coupling_axis : str, default: "xy"
Component(s) of the molecular dipole used for coupling.
x_grid_1d : list, optional
1D grid points for molecular bath coordinates along x-axis, in units of cavity length Lx. If None, defaults to [0.5] (single point at the center).
y_grid_1d : list, optional
1D grid points for molecular bath coordinates along y-axis, in units of cavity length Ly. If None, defaults to [0.5] (single point at the center).
n_grid_x : int, default: 1
Number of grid points for molecular bath coordinates along x-axis, in units of cavity length Lx. If provided together with n_grid_y, it will override x_grid_1d and generate a uniform grid along x-axis. If x_grid_1d is provided, n_grid_x will be ignored.
n_grid_y : int, default: 1
Number of grid points for molecular bath coordinates along y-axis, in units of cavity length Ly. If provided together with n_grid_x, it will override y_grid_1d and generate a uniform grid along y-axis. If y_grid_1d is provided, n_grid_y will be ignored.
n_repeat_x : int, default: 1
Number of times to repeat the grid points along x-axis.
n_repeat_y : int, default: 1
Number of times to repeat the grid points along y-axis.
delta_omega_x_au : float, default: 0.0
Frequency spacing along x-axis for cavity modes, in atomic units. The cavity mode frequencies are calculated as :math:`\omega_{k} = \sqrt{\omega_{\rm c}^2 + (l_x \Delta\omega_x)^2 + (l_y \Delta\omega_y)^2}` where :math:`l_x, l_y` are the mode indices determined by ``n_mode_x`` and ``n_mode_y``.
delta_omega_y_au : float, default: 0.0
Frequency spacing along y-axis for cavity modes, in atomic units.
n_mode_x : int, default: 1
Number of cavity modes along x-axis.
n_mode_y : int, default: 1
Number of cavity modes along y-axis.
abc_cutoff : float, default: 0.0
Absorbing boundary condition cutoff for the molecular bath grid, in units of cavity length. The cutoff is applied to both x and y axes. If 0.0, no absorbing boundary condition is applied. If > 0.0, the grid points within the cutoff distance from the boundaries will be smoothly damped to suppress unphysical reflections of the EM field at the boundaries.
"""
if frequency_au is None:
raise ValueError("frequency_au must be provided.")
self.frequency_au = float(frequency_au)
if delta_omega_x_au is None:
raise ValueError("delta_omega_x_au must be provided.")
self.delta_omega_x_au = float(delta_omega_x_au)
if delta_omega_y_au is None:
raise ValueError("delta_omega_y_au must be provided.")
self.delta_omega_y_au = float(delta_omega_y_au)
self.coupling_strength = float(coupling_strength)
self.axis = np.array([False, False, False], dtype=bool)
if "x" in coupling_axis.lower():
self.axis[0] = True
if "y" in coupling_axis.lower():
self.axis[1] = True
if "z" in coupling_axis.lower():
self.axis[2] = True
# we need True in at least one axis
if not np.any(self.axis):
raise ValueError(
"At least one coupling axis (x, y, or z) must be specified."
)
# generate 1D grid points of molecular bath coords in units of Lx, Ly
if n_grid_x is not None:
if n_grid_x <= 0:
raise ValueError("n_grid_x must be positive if provided.")
if n_repeat_x <= 0:
raise ValueError("n_repeat_x must be positive.")
if x_grid_1d is not None:
print("Warning: n_grid_x is provided and will override x_grid_1d.")
x_grid_1d = np.repeat(
np.arange(1, n_grid_x + 1) / (n_grid_x + 1), n_repeat_x
)
elif x_grid_1d is not None:
x_grid_1d = np.array(x_grid_1d, dtype=float)
else:
raise ValueError("Either n_grid_x or x_grid_1d must be provided.")
if n_grid_y is not None:
if n_grid_y <= 0:
raise ValueError("n_grid_y must be positive if provided.")
if n_repeat_y <= 0:
raise ValueError("n_repeat_y must be positive.")
if y_grid_1d is not None:
print("Warning: n_grid_y is provided and will override y_grid_1d.")
y_grid_1d = np.repeat(
np.arange(1, n_grid_y + 1) / (n_grid_y + 1), n_repeat_y
)
elif y_grid_1d is not None:
y_grid_1d = np.array(y_grid_1d, dtype=float)
else:
raise ValueError("Either n_grid_y or y_grid_1d must be provided.")
# generate 2D grid points of molecular bath coords in units of Lx, Ly
x_grid_2d, y_grid_2d = np.meshgrid(x_grid_1d, y_grid_1d)
x_grid_2d = np.reshape(x_grid_2d, -1)
y_grid_2d = np.reshape(y_grid_2d, -1)
self.n_grid = np.size(x_grid_2d)
self.grid_xy = np.column_stack((x_grid_2d, y_grid_2d))
# generate 2D grid points of kx, ky in units of 1/Lx, 1/Ly
self.n_mode_x = n_mode_x
self.n_mode_y = n_mode_y
kx_grid_1d = np.pi * np.array([i + 1.0 for i in range(n_mode_x)])
ky_grid_1d = np.pi * np.array([i + 1.0 for i in range(n_mode_y)])
kx_grid_2d, ky_grid_2d = np.meshgrid(kx_grid_1d, ky_grid_1d)
kx_grid_2d = np.reshape(kx_grid_2d, -1)
ky_grid_2d = np.reshape(ky_grid_2d, -1)
# construct cavity mode frequency array for all photon dimensions
omega_parallel = np.reshape(
(
(kx_grid_2d / np.pi * self.delta_omega_x_au) ** 2
+ (ky_grid_2d / np.pi * self.delta_omega_y_au) ** 2
)
** 0.5,
-1,
)
print("omega_parallel in cm-1", omega_parallel * AU_TO_CM_INV)
self.omega_k = (self.frequency_au**2 + omega_parallel**2) ** 0.5
print("omega_k in cm-1", self.omega_k * AU_TO_CM_INV)
# construct renormalized cavity mode function for each molecular grid point
self.n_mode = n_mode_x * n_mode_y
ftilde_k = np.zeros((self.n_mode, self.n_grid, 3), dtype=float)
for i in range(self.n_grid):
x, y = x_grid_2d[i], y_grid_2d[i]
ftilde_k[:, i, 0] = 2.0 * np.cos(kx_grid_2d * x) * np.sin(ky_grid_2d * y)
ftilde_k[:, i, 1] = 2.0 * np.sin(kx_grid_2d * x) * np.cos(ky_grid_2d * y)
self.ftilde_k = ftilde_k
self.varepsilon_k = self.coupling_strength * self.omega_k / np.min(self.omega_k)
self.x_grid_1d = x_grid_1d
self.y_grid_1d = y_grid_1d
abc_x, abc_y = None, None
self.abc_cutoff = float(abc_cutoff)
if self.abc_cutoff != 0.00:
r01 = self.abc_cutoff
r10 = 1 - self.abc_cutoff
def abc(grid_1d, r01=0.05, r10=0.95):
if len(grid_1d) < 3:
return np.ones_like(grid_1d)
grid_1d = np.array(grid_1d)
r00, r11 = grid_1d[0], grid_1d[-1]
S = np.zeros_like(grid_1d)
middle = np.where((grid_1d < r10) & (grid_1d > r01))[0]
S[middle] = 1
l_side = np.where((grid_1d < r01) & (grid_1d > r00))[0]
r_side = np.where((grid_1d < r11) & (grid_1d > r10))[0]
def smooth(x, l, r, p):
frac = (l - r) / (l - x) + (r - l) / (x - r)
if p:
return 1 / (1 + np.exp(-frac))
else:
return 1 / (1 + np.exp(frac))
l_value = np.array(
[smooth(i, l=r00, r=r01, p=False) for i in grid_1d[l_side]]
)
r_value = np.array(
[smooth(i, l=r10, r=r11, p=True) for i in grid_1d[r_side]]
)
S[l_side] = l_value
S[r_side] = r_value
return S
self.smooth_x = abc(self.x_grid_1d, r01=r01, r10=r10)
self.smooth_y = abc(self.y_grid_1d, r01=r01, r10=r10)
self.smooth_x_2d, self.smooth_y_2d = np.meshgrid(
self.smooth_x, self.smooth_y
)
self.smooth_2d = np.reshape(self.smooth_x_2d * self.smooth_y_2d, -1)
from scipy.linalg import solve, pinv
try:
F_x = self.ftilde_k[:, :, 0]
G_x = F_x @ F_x.T
B_x = (F_x * self.smooth_2d[None, :]) @ F_x.T
eps_x = 1e-8 * max(np.linalg.norm(G_x, ord=2), 1.0)
abc_x = solve(
G_x.T + eps_x * np.eye(G_x.shape[0]),
B_x.T,
assume_a="sym",
check_finite=False,
).T
except np.linalg.LinAlgError:
print("[MultiModeCavity] Warning: F_x @ F_x.T is singular. Falling back to pseudo-inverse solution.")
abc_x = self.ftilde_k[:, :, 0] @ self.smooth_2d[None, :] @ pinv(self.ftilde_k[:, :, 0])
try:
F_y = self.ftilde_k[:, :, 1]
G_y = F_y @ F_y.T
B_y = (F_y * self.smooth_2d[None, :]) @ F_y.T
eps_y = 1e-8 * max(np.linalg.norm(G_y, ord=2), 1.0)
abc_y = solve(
G_y.T + eps_y * np.eye(G_y.shape[0]),
B_y.T,
assume_a="sym",
check_finite=False,
).T
except np.linalg.LinAlgError:
print("[MultiModeCavity] Warning: F_y @ F_y.T is singular. Falling back to pseudo-inverse solution.")
abc_y = self.ftilde_k[:, :, 1] @ self.smooth_2d[None, :] @ pinv(self.ftilde_k[:, :, 1])
self.abc_x = abc_x
self.abc_y = abc_y
self.if_abc = (abc_x is not None) and (abc_y is not None)
print(
f"Applying Absorbing Boundary Condition : {self.if_abc}, cutoff: {self.abc_cutoff}"
)
[docs]
class MultiModeSimulation(DummyEMSimulation):
r"""
Mesoscale cavities coupled to MaxwellLink molecules.
All quantities are in atomic units.
"""
def __getattr__(self, name):
if self.cavity_geometry is not None and hasattr(self.cavity_geometry, name):
return getattr(self.cavity_geometry, name)
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'"
)
[docs]
def __init__(
self,
dt_au: float = None,
damping_au: float = 0.0,
molecules: Optional[Iterable[Molecule]] = None,
hub: Optional[SocketHub] = None,
qc_initial: Optional[list] = None,
pc_initial: Optional[list] = None,
mu_initial: Optional[list] = None,
dmudt_initial: Optional[list] = None,
include_dse: bool = True,
molecule_half_step: bool = False,
shift_dipole_baseline: bool = False,
gauge="dipole",
cavity_geometry: Optional[object] = None,
excited_mode_list: Optional[list] = [],
photon_pulse_drive: Optional[Union[float, Callable[[float], float]]] = None,
photon_pulse_axis: str = "y",
excited_grid_list: Optional[list] = [],
molecule_pulse_drive: Optional[Union[float, Callable[[float], float]]] = None,
molecule_pulse_axis: str = "y",
initializer: Optional[object] = DummyInitializer(),
thermostat: Optional[object] = DummyThermostat(),
):
r"""
Parameters
----------
dt_au : float
Simulation time step in atomic units.
damping_au : float
Damping constant :math:`\kappa` (a.u.).
molecules : iterable of Molecule, optional
Molecules coupled to the cavity.
hub : :class:`~maxwelllink.sockets.sockets.SocketHub`, optional
Socket hub shared by all socket-mode molecules.
qc_initial : np.ndarray, default: None
Initial cavity field coordinate (a.u.).
pc_initial : np.ndarray, default: None
Initial cavity field momentum (a.u.).
mu_initial : np.ndarray, default: None
Initial total molecular dipole vector (a.u.).
dmudt_initial : np.ndarray, default: None
Initial time derivative of the total molecular dipole vector (a.u.).
include_dse : bool, default: True
Include dipole self-energy term in the simulation.
molecule_half_step : bool, default: True
Whether to further evaluate molecular info for another half time step.
shift_dipole_baseline : bool, default: False
Whether to shift all dipole values using the initial dipole value, so initial dipole value is changed to zero.
Setting this to True can facilitate simulating strong coupling systems with large permanent dipoles.
gauge : str, default: "dipole"
Gauge choice for light-matter coupling: "dipole".
cavity_geometry : object, optional
Cavity geometry object that defines the cavity mode structure and light-matter coupling. Must be an instance of a class that implements the necessary attributes and methods (e.g., ``omega_k``, ``varepsilon_k``, ``ftilde_k``, etc.) as used in the equations of motion.
excited_mode_list : list, optional
List of excited cavity modes.
photon_pulse_drive : float or callable, optional
Constant photon pulse drive term or function ``photon_pulse_drive(t_au)``.
photon_pulse_axis : str, default: "y"
pulse axis for the photon pulse.
excited_grid_list : list, optional
List of grid point indices that are excited by the molecule pulse. The excitation is applied by adding the molecule pulse drive to the effective electric field at these grid points.
molecule_pulse_drive : float or callable, optional
Constant molecule pulse drive or function ``molecule_pulse_drive(t_au)`` that determines the strength of the molecule pulse applied to the excited grid points. The callable may return a scalar, or vector-valued fields, or a per-grid array over ``excited_grid_list``.
molecule_pulse_axis : str, default: "y"
pulse axis for the molecule pulse.
initializer : object, optional
An object that provides methods to initialize cavity field coordinates and momenta, as well as molecular dipoles if needed. Must have methods ``position_initializer(omega, q)`` and ``momentum_initializer(p)`` that return initialized positions and momenta based on the cavity frequencies and/or other parameters.
Two example initializers are provided in ``maxwelllink.tools.harmonic_oscillator_helper``: ``DummyInitializer`` (returns zeros) and ``MaxwellBoltzmannInitializer`` (initializes based on Maxwell-Boltzmann distribution at a given temperature).
thermostat : object, optional
An object that provides a method to apply a thermostat to the cavity field momenta. Must have a method ``apply_kick(momentum)`` that takes the current momenta and returns the modified momenta after applying the thermostat kick. Two example thermostats are provided in ``maxwelllink.tools.harmonic_oscillator_helper``: ``DummyThermostat`` (does nothing) and a simple ``LangevinThermostat`` that applies a frictional kick based on a specified temperature and damping constant.
"""
super().__init__(hub=hub, molecules=molecules)
if dt_au is None:
raise ValueError("dt_au must be provided.")
elif dt_au <= 0.0:
raise ValueError("dt_au must be positive.")
self.dt = float(dt_au)
self.damping = float(damping_au)
self.gauge = gauge.lower()
if self.gauge not in ["dipole"]:
raise ValueError("gauge must be 'dipole'.")
molecules = list(molecules or [])
self.wrappers: List[MoleculeMultiModeWrapper] = [
MoleculeMultiModeWrapper(molecule=m, dt_au=self.dt) for m in molecules
]
self.socket_wrappers = [w for w in self.wrappers if w.mode == "socket"]
self.non_socket_wrappers = [w for w in self.wrappers if w.mode == "non-socket"]
if self.socket_wrappers:
hubs = {w.hub for w in self.socket_wrappers if w.hub is not None}
if hub is not None:
hubs.add(hub)
if len(hubs) > 1:
raise ValueError(
"All socket-mode molecules must share the same SocketHub."
)
self.hub: SocketHub = hub or self.socket_wrappers[0].hub
if self.hub is None:
raise ValueError("Socket-mode molecules require a SocketHub instance.")
else:
self.hub = None
# Assign IDs and initialize non-socket drivers
# By default, SocketHub assigns IDs starting from 0, so we start
# non-socket IDs after all socket ones.
next_id = len(self.socket_wrappers)
for wrapper in self.non_socket_wrappers:
wrapper.initialize_driver(next_id)
next_id += 1
self.time = 0.0
self.cavity_geometry = cavity_geometry
if mu_initial is None:
mu_initial = np.zeros((self.n_grid, 3), dtype=float)
else:
mu_initial = np.array(mu_initial, dtype=float).reshape((self.n_grid, 3))
if dmudt_initial is None:
dmudt_initial = np.zeros((self.n_grid, 3), dtype=float)
else:
dmudt_initial = np.array(dmudt_initial, dtype=float).reshape(
(self.n_grid, 3)
)
if qc_initial is None:
qc_initial = np.zeros((self.n_mode, 3), dtype=float)
else:
qc_initial = np.array(qc_initial, dtype=float).reshape((self.n_mode, 3))
if pc_initial is None:
pc_initial = np.zeros((self.n_mode, 3), dtype=float)
else:
pc_initial = np.array(pc_initial, dtype=float).reshape((self.n_mode, 3))
self.initializer = initializer
self.thermostat = thermostat
qc_initial = self.initializer.position_initializer(self.omega_k, qc_initial)
pc_initial = self.initializer.momentum_initializer(pc_initial)
self.qc = qc_initial * self.axis
self.pc = pc_initial * self.axis
self.dipole = mu_initial * self.axis
self.dipole_prev = self.dipole.copy()
self.dmudt = dmudt_initial * self.axis
self.dmudt_prev = self.dmudt.copy()
self.dipole_lookahead = self.dipole.copy()
self.has_dipole_lookahead = False
self.acceleration = np.zeros((self.n_mode, 3), dtype=float)
if isinstance(excited_mode_list, list):
self.excited_mode_list = excited_mode_list
else:
raise ValueError(
"excited_mode_list must be a list of integers specifying the indices of the excited cavity modes."
)
self.photon_drive = (
photon_pulse_drive if photon_pulse_drive is not None else (lambda _: 0.0)
)
if isinstance(self.photon_drive, (int, float)):
const = float(self.photon_drive)
self.photon_drive = lambda _t, c=const: c
if isinstance(excited_grid_list, list):
self.excited_grid_list = excited_grid_list
else:
raise ValueError(
"excited_grid_list must be a list of integers specifying the indices of the excited grid points for molecule pulse."
)
self.molecule_pulse = (
molecule_pulse_drive
if molecule_pulse_drive is not None
else (lambda _: 0.0)
)
if isinstance(self.molecule_pulse, (int, float)):
const = float(self.molecule_pulse)
self.molecule_pulse = lambda _t, c=const: c
self.photon_pulse_axis = np.array([False, False, False], dtype=bool)
if "x" in photon_pulse_axis.lower():
self.photon_pulse_axis[0] = True
if "y" in photon_pulse_axis.lower():
self.photon_pulse_axis[1] = True
if "z" in photon_pulse_axis.lower():
self.photon_pulse_axis[2] = True
# we need True in at least one axis
if not np.any(self.photon_pulse_axis):
raise ValueError("At least one pulse axis (x, y, or z) must be specified.")
self.molecule_pulse_axis = np.array([False, False, False], dtype=bool)
if "x" in molecule_pulse_axis.lower():
self.molecule_pulse_axis[0] = True
if "y" in molecule_pulse_axis.lower():
self.molecule_pulse_axis[1] = True
if "z" in molecule_pulse_axis.lower():
self.molecule_pulse_axis[2] = True
# we need True in at least one axis
if not np.any(self.molecule_pulse_axis):
raise ValueError("At least one pulse axis (x, y, or z) must be specified.")
self.include_dse = bool(include_dse)
self.molecule_half_step = bool(molecule_half_step)
self.shift_dipole_baseline = bool(shift_dipole_baseline)
if self.shift_dipole_baseline:
# shift all dipole values using the initial dipole value, so initial dipole value is zero
self.dipole_baseline = self.dipole.copy()
self.dipole -= self.dipole_baseline
self.dipole_prev = self.dipole.copy()
print("[MultiModeCavity] Shifted dipole baseline by:", self.dipole_baseline)
# ------------------------------------------------------------------
# Core helpers
# ------------------------------------------------------------------
def _ensure_socket_connections(self):
if not self.socket_wrappers:
return
init_payloads = {
w.molecule_id: {**w.init_payload, "molecule_id": w.molecule_id}
for w in self.socket_wrappers
}
ok = self.hub.wait_until_bound(init_payloads, require_init=True, timeout=None)
if not ok:
raise RuntimeError("Timeout waiting for socket molecules to bind.")
def _collect_socket_responses(self, efield_vec: Sequence[float]) -> Dict[int, dict]:
"""
Send requests to socket molecules and collect their responses.
Parameters
----------
efield_vec : array-like of float, shape (n_grid, 3)
Effective electric field vector in atomic units.
Returns
-------
dict of int to dict
Mapping from molecule IDs to their response payloads.
"""
requests = {
w.molecule_id: {
"efield_au": efield_vec[w.molecule_id, :],
"meta": {"time_au": self.time},
"init": {**w.init_payload, "molecule_id": w.molecule_id},
}
for w in self.socket_wrappers
}
responses = self.hub.step_barrier(requests)
while not responses:
self._ensure_socket_connections()
responses = self.hub.step_barrier(requests)
return responses
def _calc_acceleration(
self, time: float, mu: np.ndarray, qc: np.ndarray
) -> np.ndarray:
"""
Calculate the cavity mode acceleration at the given time.
Parameters
----------
time : float
Current simulation time in atomic units.
mu : np.ndarray of float, shape (n_grid, 3)
Sum of individual molecular dipole moment vectors.
qc : np.ndarray of float, shape (n_mode, 3)
Current cavity field coordinates.
Returns
-------
float
The calculated acceleration.
"""
mu_dot_f = np.einsum("ijk, jk->ik", self.ftilde_k, mu)
acceleration = -np.einsum("i,ik->ik", self.varepsilon_k, mu_dot_f) - np.einsum(
"i,ik->ik", self.omega_k**2, qc
)
if self.excited_mode_list:
acceleration[self.excited_mode_list, :] += self._eval_pulse_field(
pulse=self.photon_drive,
selected=self.excited_mode_list,
axis=self.photon_pulse_axis,
label="photon_pulse_drive",
time=time,
)
# print("In Function, Cavity acceleration:", acceleration)
acceleration = np.einsum("ik, k->ik", acceleration, self.axis)
return acceleration
def _calc_effective_efield(self, qc: np.ndarray, mu: np.ndarray) -> np.ndarray:
"""
Calculate the effective electric field vector for the cavity mode.
Parameters
----------
qc : numpy.ndarray of float, shape (n_mode, 3)
Current cavity mode coordinate vector with shape (n_mode, 3).
mu : numpy.ndarray of float, shape (n_grid, 3)
Current total molecular dipole vector with shape (n_grid, 3).
Returns
-------
numpy.ndarray of float, shape (n_grid, 3)
Effective electric field vector in atomic units.
"""
varepsilon_dot_qc = -np.einsum("i,ij->ij", self.varepsilon_k, qc)
# add dipole self-energy term for the electric field if enabled
if self.include_dse:
mu_dot_f = np.einsum("ijk, jk->ik", self.ftilde_k, mu)
varepsilon_dot_qc -= np.einsum(
"i,ik,i->ik",
self.varepsilon_k**2,
mu_dot_f,
1.0 / (self.omega_k**2),
optimize=True,
)
efield_vec = np.einsum(
"ijk,ik,k->jk", self.ftilde_k, varepsilon_dot_qc, self.axis, optimize=True
)
assert efield_vec.shape == mu.shape
return efield_vec
def _calc_photonic_energy(self, pc, qc) -> np.ndarray:
"""
Calculate the energy of the all photonic modes.
E_k = \sum_{\lambda} 0.5 * p_{k\lambda}^2 + 0.5 * omega_k^2 * q_{k\lambda}^2
Parameters
----------
pc : numpy.ndarray of float, shape (n_mode, 3)
Current cavity mode momentum with shape (n_mode,3).
qc : numpy.ndarray of float, shape (n_mode, 3)
Current cavity mode coordinate with shape (n_mode,3).
Returns
-------
photonic_energy : numpy.ndarray of float, shape (n_mode,)
Energy of each cavity mode.
"""
kinetic_energy = 0.5 * np.sum(pc**2, axis=1)
potential_energy = 0.5 * np.sum((self.omega_k[:, np.newaxis] * qc) ** 2, axis=1)
photonic_energy = kinetic_energy + potential_energy
return photonic_energy
def _calc_energy(self, pc, qc, mu) -> float:
"""
Calculate the total energy of the cavity + molecular system.
Parameters
----------
pc : numpy.ndarray of float, shape (n_mode, 3)
Current cavity mode momentum with shape (n_mode,3).
qc : numpy.ndarray of float, shape (n_mode, 3)
Current cavity mode coordinate with shape (n_mode,3).
mu : numpy.ndarray of float, shape (n_grid, 3)
Current total molecular dipole along the coupling axis with shape (n_grid,3).
Returns
-------
float
Total energy of the cavity + molecular system.
"""
kinetic_energy = 0.5 * pc**2
mu_dot_f = np.einsum("ijk, jk->ik", self.ftilde_k, mu)
potential_energy = 0.5 * np.einsum(
"i,ij->ij", self.omega_k**2, qc**2
) + np.einsum("i,ij,ij->ij", self.varepsilon_k, qc, mu_dot_f)
if self.include_dse:
potential_energy += 0.5 * np.einsum(
"i,ij->ij", self.varepsilon_k**2 / self.omega_k**2, mu_dot_f**2
)
e_molecule = sum(
wrapper.additional_data_history[-1]["energy_au"]
for wrapper in self.wrappers
)
total_energy = (
np.sum((kinetic_energy + potential_energy) * self.axis) + e_molecule
)
return total_energy
def _calc_dipole_vec(self) -> np.ndarray:
"""
Calculate the total molecular dipole vector.
Returns
-------
numpy.ndarray of float, shape (n_grid, 3)
Total molecular dipole vector in atomic units.
"""
dipole_vec = np.zeros((self.n_grid, 3), dtype=float)
for wrapper in self.wrappers:
if wrapper.additional_data_history:
latest_data = wrapper.additional_data_history[-1]
rescaling_factor = 1.0
if wrapper.rescaling_factor != None:
rescaling_factor = float(wrapper.rescaling_factor)
dipole_vec[wrapper.molecule_id, 0] = (
latest_data.get("mux_au") * rescaling_factor
)
dipole_vec[wrapper.molecule_id, 1] = (
latest_data.get("muy_au") * rescaling_factor
)
dipole_vec[wrapper.molecule_id, 2] = (
latest_data.get("muz_au") * rescaling_factor
)
if self.shift_dipole_baseline:
dipole_vec -= self.dipole_baseline
return dipole_vec
def _calc_dipole_lookahead_vec(self):
"""
Reconstruct force-time dipoles one VV drift ahead from returned JSON.
Drivers may report ``mu`` at both half a time step after force
evaluation (``mux_au``/``muy_au``/``muz_au``) and at the force
evaluation time itself (``mux_m_au``/``muy_m_au``/``muz_m_au``). For
velocity Verlet position dipoles, ``2 * mu_half - mu_force`` is the
drifted dipole at the next force-evaluation time, before the next
E-field kick is known.
"""
half_keys = ("mux_au", "muy_au", "muz_au")
force_keys = ("mux_m_au", "muy_m_au", "muz_m_au")
dipole_vec = np.zeros((self.n_grid, 3), dtype=float)
has_half_step_shift = False
if not self.wrappers:
return dipole_vec, False
for wrapper in self.wrappers:
if not wrapper.additional_data_history:
return dipole_vec, False
latest_data = wrapper.additional_data_history[-1]
if not all(k in latest_data for k in half_keys + force_keys):
return dipole_vec, False
rescaling_factor = 1.0
if wrapper.rescaling_factor is not None:
rescaling_factor = float(wrapper.rescaling_factor)
mu_half = np.array([latest_data[k] for k in half_keys], dtype=float)
mu_force = np.array([latest_data[k] for k in force_keys], dtype=float)
has_half_step_shift = has_half_step_shift or not np.array_equal(
mu_half, mu_force
)
dipole_vec[wrapper.molecule_id, :] = (
2.0 * mu_half - mu_force
) * rescaling_factor
if self.shift_dipole_baseline:
dipole_vec -= self.dipole_baseline
return dipole_vec * self.axis, has_half_step_shift
def _eval_pulse_field(
self,
pulse,
selected: Sequence[int],
axis: Sequence[bool],
label: str,
time: Optional[float] = None,
):
"""
Evaluate a pulse field under different types.
``pulse`` can return a scalar for the uniform pulse, or an array
of pulses over the selected indices.
"""
n_selected = len(selected)
value = np.asarray(pulse(self.time if time is None else time), dtype=float)
axis = np.asarray(axis, dtype=float)
# for scalar uniform pulse, we broadcast it to all selected axes
if value.ndim == 0:
return float(value) * axis[None, :]
# for array pulse, we broadcast it to the selected axes
if value.shape == (n_selected,):
return value[:, None] * axis[None, :]
# for three-dimensional array pulse, we simply use the raw values
if value.shape == (n_selected, 3):
return value
raise ValueError(
f"{label}(t) must return a scalar, "
"(len(selected),), or (len(selected), 3)."
)
def _step_molecules(self, efield_vec: Sequence[float]):
"""
Propagate all molecules for one EM step and collect their dipole moments.
Parameters
----------
efield_vec : array-like of float, shape (n_grid, 3)
Effective electric field vector in atomic units.
Returns
-------
float
Sum of molecular dipole moment along the coupling axis.
"""
# modify local e-field for molecules under excitation
if self.excited_grid_list:
efield_vec[self.excited_grid_list, :] += self._eval_pulse_field(
pulse=self.molecule_pulse,
selected=self.excited_grid_list,
axis=self.molecule_pulse_axis,
label="molecule_pulse_drive",
)
# Non-socket molecules
for wrapper in self.non_socket_wrappers:
wrapper.propagate(efield_vec[wrapper.molecule_id, :])
amp = wrapper.calc_amp_vector() * wrapper.rescaling_factor
wrapper.last_amp = amp
wrapper.append_additional_data(time_au=self.time)
# Socket molecules
if self.socket_wrappers:
self._ensure_socket_connections()
responses = self._collect_socket_responses(efield_vec)
for wrapper in self.socket_wrappers:
payload = responses.get(wrapper.molecule_id)
if not payload:
continue
amp = (
np.asarray(payload.get("amp", [0.0, 0.0, 0.0]), dtype=float)
* wrapper.rescaling_factor
)
wrapper.last_amp = amp
extra_blob = payload.get("extra", b"")
if extra_blob:
try:
data = json.loads(extra_blob.decode("utf-8"))
if isinstance(data, dict):
data.setdefault("time_au", self.time)
wrapper.additional_data_history.append(data)
except Exception:
pass
# dmu/dt
dmudt = np.zeros((self.n_grid, 3), dtype=float)
for wrapper in self.wrappers:
dmudt[wrapper.molecule_id, :] = wrapper.last_amp
# for dipole gauge we use mu directly instead of dmu/dt
dipole = self._calc_dipole_vec()
# we need to filter only the coupling axis
dipole = dipole * self.axis
dmudt = dmudt * self.axis
# print("In Function, Total Dipole vector:", dipole)
# print("In Function, Total Dipole velocity (dmu/dt):", dmudt)
return dipole, dmudt
def _step_dipole_gauge(self, savedata: bool = True, step_idx: Optional[int] = None):
"""
Advance the simulation by one time step under the dipole gauge.
"""
# 1. update momentum to half step
pc_half = self.pc + 0.5 * self.dt * self.acceleration
# 2. update position to full step
qc_prev = self.qc.copy()
self.qc += self.dt * pc_half
# updating E-field at half step using interpolated dipole
# this interpolation is not very accurate
# dipole = self.dipole + 0.5 * self.dt * (1.5 * self.dmudt - 0.5 * self.dmudt_prev)
# the following expression is accurate to the order of dt^4
if (
self.include_dse
and self.has_dipole_lookahead
and not self.molecule_half_step
):
dipole = self.dipole_lookahead.copy()
else:
dipole = self.dipole_prev + self.dt * (
9.0 / 8.0 * self.dmudt + 3.0 / 8.0 * self.dmudt_prev
)
efield_vec = self._calc_effective_efield(
qc_prev + 0.5 * self.dt * pc_half, dipole
)
# update dipole info
self.dipole_prev = self.dipole.copy()
self.dmudt_prev = self.dmudt.copy()
# the value for n+1/2 time step
self.dipole, self.dmudt = self._step_molecules(efield_vec)
self.dipole_lookahead, self.has_dipole_lookahead = (
self._calc_dipole_lookahead_vec()
)
# extrapolate to n+1 time step (ONLY NEEDED FOR VELOCITY VERLET MOLECULE PROPAGATION)
if self.molecule_half_step:
self.dipole = 2.0 * self.dipole - self.dipole_prev
self.dmudt = 2.0 * self.dmudt - self.dmudt_prev
acceleration = self._calc_acceleration(self.time, self.dipole, self.qc)
# 3. update momentum from half step to full step
# apply absorbing boundary condition to the cavity field if enabled
self.pc = pc_half + 0.5 * self.dt * acceleration
if self.if_abc:
self.pc[:, 0] = self.pc[:, 0] @ self.abc_x
self.pc[:, 1] = self.pc[:, 1] @ self.abc_y
self.pc *= np.exp(-self.damping * self.dt)
self.pc = self.thermostat.apply_kick(self.pc)
# 4. update acceleration and time
self.time += self.dt
self.acceleration = acceleration.copy()
self._record_history(savedata=savedata, step_idx=step_idx)
def _record_history(self, savedata: bool = True, step_idx: Optional[int] = None):
"""
Record the history of the simulation at the current time step.
"""
def pulse_record_value(pulse):
value = np.asarray(pulse(self.time), dtype=float)
if value.ndim == 0:
return np.array([float(value)], dtype=float)
return value.copy()
record_idx = step_idx // self.record_every_steps
if savedata and (record_idx <= self.record_max_steps):
if self.record_to_disk:
if self.file_format == "npz":
if "time" in self.record_list:
self.memmaps["time"][record_idx, 0] = self.time
if "qc" in self.record_list:
self.memmaps["qc"][record_idx, :, :] = self.qc.copy()
if "pc" in self.record_list:
self.memmaps["pc"][record_idx, :, :] = self.pc.copy()
if "photon_drive" in self.record_list:
self.memmaps["photon_drive"][record_idx, ...] = (
pulse_record_value(self.photon_drive)
)
if "molecule_pulse" in self.record_list:
self.memmaps["molecule_pulse"][record_idx, ...] = (
pulse_record_value(self.molecule_pulse)
)
if "energy" in self.record_list:
self.memmaps["energy"][record_idx, 0] = self._calc_energy(self.pc, self.qc, self.dipole)
if "photonic_energy" in self.record_list:
self.memmaps["photonic_energy"][record_idx, :] = self._calc_photonic_energy(self.pc, self.qc)
if "effective_efield" in self.record_list:
self.memmaps["effective_efield"][record_idx, :, :] = (
self._calc_effective_efield(self.qc, self.dipole)
)
if "molecule_response" in self.record_list:
self.memmaps["molecule_response"][
record_idx, :, :
] = self.dmudt.copy()
if "molecule_dipole" in self.record_list:
self.memmaps["molecule_dipole"][
record_idx, :, :
] = self.dipole.copy()
if record_idx % 1000 == 0:
for mm in self.memmaps.values():
mm.flush()
print(
f"[MultiModeCavity] Flushed history data to {self.filename} at step {step_idx}."
)
else: # h5 format
if "time" in self.record_list:
self.h5_file["time"][record_idx, 0] = self.time
if "qc" in self.record_list:
self.h5_file["qc"][record_idx, :, :] = self.qc.copy()
if "pc" in self.record_list:
self.h5_file["pc"][record_idx, :, :] = self.pc.copy()
if "photon_drive" in self.record_list:
self.h5_file["photon_drive"][record_idx, ...] = (
pulse_record_value(self.photon_drive)
)
if "molecule_pulse" in self.record_list:
self.h5_file["molecule_pulse"][record_idx, ...] = (
pulse_record_value(self.molecule_pulse)
)
if "energy" in self.record_list:
self.h5_file["energy"][record_idx, 0] = self._calc_energy(self.pc, self.qc, self.dipole)
if "photonic_energy" in self.record_list:
self.h5_file["photonic_energy"][record_idx, :] = self._calc_photonic_energy(self.pc, self.qc)
if "effective_efield" in self.record_list:
self.h5_file["effective_efield"][record_idx, :, :] = (
self._calc_effective_efield(self.qc, self.dipole)
)
if "molecule_response" in self.record_list:
self.h5_file["molecule_response"][
record_idx, :, :
] = self.dmudt.copy()
if "molecule_dipole" in self.record_list:
self.h5_file["molecule_dipole"][
record_idx, :, :
] = self.dipole.copy()
if record_idx % 1000 == 0:
self.h5_file.flush()
print(
f"[MultiModeCavity] Flushed history data to {self.filename} at step {step_idx}."
)
else:
if "time" in self.record_list:
self.time_history.append(self.time)
if "qc" in self.record_list:
self.qc_history.append(self.qc.copy())
if "pc" in self.record_list:
self.pc_history.append(self.pc.copy())
if "photon_drive" in self.record_list:
self.photon_drive_history.append(self.photon_drive(self.time))
if "molecule_pulse" in self.record_list:
self.molecule_pulse_history.append(self.molecule_pulse(self.time))
if "energy" in self.record_list:
self.energy_history.append(self._calc_energy(self.pc, self.qc, self.dipole))
if "photonic_energy" in self.record_list:
self.photonic_energy_history.append(self._calc_photonic_energy(self.pc, self.qc))
if "molecule_response" in self.record_list:
self.molecule_response_history.append(self.dmudt.copy())
if "effective_efield" in self.record_list:
self.effective_efield_history.append(
self._calc_effective_efield(self.qc, self.dipole)
)
if "molecule_dipole" in self.record_list:
self.molecule_dipole_history.append(self.dipole.copy())
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def step(self, savedata: bool = True, step_idx: Optional[int] = None):
"""
Advance the simulation by one time step.
"""
if self.gauge == "dipole":
self._step_dipole_gauge(savedata=savedata, step_idx=step_idx)
else:
raise ValueError("gauge must be either 'dipole' or 'velocity'.")
[docs]
def storage_initialization(
self,
steps: Optional[int] = None,
record_history: bool = True,
record_to_disk: bool = False,
disk_folder_address: Optional[str] = None,
npz_filename: Optional[str] = None,
h5_filename: Optional[str] = None,
record_max_steps: Optional[int] = None,
record_every_steps: int = 1,
record_list: Optional[list] = None,
):
"""
Initialize storage for recording simulation history.
"""
self.record_history = bool(record_history)
self.record_to_disk = bool(record_to_disk)
self.disk_folder_address = disk_folder_address
if npz_filename is not None and h5_filename is not None:
raise ValueError(
"Only one of npz_filename and h5_filename can be provided."
)
elif npz_filename is None and h5_filename is None and record_to_disk:
raise ValueError(
"Either npz_filename or h5_filename must be provided when record_to_disk is True."
)
elif npz_filename is not None:
self.filename = npz_filename
self.file_format = "npz"
elif h5_filename is not None:
self.filename = h5_filename
self.file_format = "h5"
if record_every_steps < 1 or isinstance(record_every_steps, int) == False:
raise ValueError("record_every_steps must be a positive integer.")
else:
self.record_every_steps = int(record_every_steps)
not_record = (record_list == []) or (record_list is None)
if not_record:
self.record_history = False
self.record_to_disk = False
if isinstance(record_list, list) == False and (not_record == False):
raise ValueError("record_list must be a list or None.")
if not_record:
self.record_list = []
else:
for item in record_list:
if item not in ["all", "time", "qc", "pc", "photon_drive", "molecule_pulse", "energy", "photonic_energy", "effective_efield", "molecule_response", "molecule_dipole"]:
raise ValueError(f"Invalid record_list item: {item}. Must be one of 'all', 'time', 'qc', 'pc', 'photon_drive', 'molecule_pulse', 'energy', 'photonic_energy', 'effective_efield', 'molecule_response', 'molecule_dipole'.")
if record_list == ["all"]:
self.record_list = ["time", "qc", "pc", "photon_drive", "molecule_pulse", "energy", "photonic_energy", "effective_efield", "molecule_response", "molecule_dipole"]
else :
self.record_list = record_list
if record_max_steps is None:
self.record_max_steps = steps // self.record_every_steps
else:
self.record_max_steps = int(record_max_steps)
if self.record_max_steps < 0:
raise ValueError("record_max_steps must be non-negative or None.")
if self.record_history:
if self.record_to_disk and disk_folder_address is not None:
def pulse_record_dim(pulse):
shape = np.asarray(pulse(self.time), dtype=float).shape
return 1 if shape == () else shape
photon_drive_dim = 1
molecule_pulse_dim = 1
if "photon_drive" in self.record_list:
photon_drive_dim = pulse_record_dim(self.photon_drive)
if "molecule_pulse" in self.record_list:
molecule_pulse_dim = pulse_record_dim(self.molecule_pulse)
self.dim_dict = {
"time": 1,
"qc": self.qc.shape,
"pc": self.pc.shape,
"photon_drive": photon_drive_dim,
"molecule_pulse": molecule_pulse_dim,
"energy": 1,
"photonic_energy": self.pc.shape[0],
"effective_efield": self.dmudt.shape,
"molecule_response": self.dmudt.shape,
"molecule_dipole": self.dmudt.shape}
if self.file_format == "npz":
TEMP_DIR = os.path.join(disk_folder_address, "temp_memmap")
self.temp_dir = TEMP_DIR
if os.path.exists(TEMP_DIR):
print(
f"[MultiModeCavity] Temporary directory {TEMP_DIR} already exists. Deleting and recreating it."
)
shutil.rmtree(TEMP_DIR)
os.makedirs(TEMP_DIR, exist_ok=True)
self.memmaps = {}
self.temp_files = {}
for name in self.record_list:
dim = self.dim_dict[name]
filename = os.path.join(TEMP_DIR, f"temp_{name}.bin")
self.temp_files[name] = filename
full_shape = (self.record_max_steps,) + (
dim if isinstance(dim, tuple) else (dim,)
)
memmap_obj = np.memmap(
filename, dtype=np.float64, mode="w+", shape=full_shape
)
self.memmaps[name] = memmap_obj
else: # h5 format
assert self.file_format == "h5"
import h5py
self.h5_file = h5py.File(
os.path.join(disk_folder_address, self.filename), "w"
)
self.datasets = {}
for name in self.record_list:
dim = self.dim_dict[name]
shape = (self.record_max_steps,) + (
dim if isinstance(dim, tuple) else (dim,)
)
self.datasets[name] = self.h5_file.create_dataset(
name, shape=shape, dtype=np.float64, maxshape=shape
)
elif self.record_to_disk and self.disk_folder_address is None:
raise ValueError(
"disk_folder_address must be provided when record_to_disk is True."
)
else:
if "time" in self.record_list:
self.time_history = []
if "qc" in self.record_list:
self.qc_history = []
if "pc" in self.record_list:
self.pc_history = []
if "photon_drive" in self.record_list:
self.photon_drive_history = []
if "molecule_pulse" in self.record_list:
self.molecule_pulse_history = []
if "energy" in self.record_list:
self.energy_history = []
if "photonic_energy" in self.record_list:
self.photonic_energy_history = []
if "effective_efield" in self.record_list:
self.effective_efield_history = []
if "molecule_response" in self.record_list:
self.molecule_response_history = []
if "molecule_dipole" in self.record_list:
self.molecule_dipole_history = []
print(
f"[MultiModeCavity] Recording history: {self.record_history}, to disk: {self.record_to_disk}, record_every_steps: {self.record_every_steps}, record_max_steps: {self.record_max_steps}"
)
print(
f"[MultiModeCavity] Recording fields : {self.record_list if self.record_to_disk else 'none'}"
)
print(
f"[MultiModeCavity] Recording address: {self.disk_folder_address if self.record_to_disk else 'N/A'}, filename: {self.filename if self.record_to_disk else 'N/A'}"
)
print(
f"[MultiModeCavity] Temporary folder : {self.temp_dir if self.record_to_disk and self.file_format == 'npz' else 'N/A'}"
)
[docs]
def storage_finalization(self):
"""
Finalize storage for recording simulation history.
"""
if self.record_history:
if self.record_to_disk and self.disk_folder_address is not None:
if self.file_format == "npz":
for mm in self.memmaps.values():
mm.flush()
data_for_npz = {}
for name in self.record_list:
dim = self.dim_dict[name]
full_shape = (self.record_max_steps,) + (
dim if isinstance(dim, tuple) else (dim,)
)
filename = self.temp_files[name]
mmap_ro = np.memmap(
filename, dtype=np.float64, mode="r", shape=full_shape
)
data_for_npz[name] = mmap_ro
npz_path = os.path.join(self.disk_folder_address, self.filename)
np.savez_compressed(npz_path, **data_for_npz)
print(f"[MultiModeCavity] Results saved to {npz_path}")
data_for_npz.clear()
shutil.rmtree(self.temp_dir)
print(
f"[MultiModeCavity] Temporary files at {self.temp_dir} deleted."
)
else: # h5 format
self.h5_file.close()
print(
f"[MultiModeCavity] Results saved to {os.path.join(self.disk_folder_address, self.filename)}"
)
[docs]
def run(
self,
until: Optional[float] = None,
steps: Optional[int] = None,
record_history: bool = True,
record_to_disk: bool = False,
disk_folder_address: Optional[str] = None,
npz_filename: Optional[str] = None,
h5_filename: Optional[str] = None,
record_max_steps: Optional[int] = None,
record_every_steps: int = 1,
record_list: Optional[list] = None,
):
"""
Run the simulation for a specified duration or number of steps.
Parameters
----------
until : float, optional
Total simulation time (a.u.). ``steps`` must be ``None``.
steps : int, optional
Number of steps to execute. ``until`` must be ``None``,
record_history : bool, default: True
Record time, field, velocity, drive, and molecular response histories.
record_to_disk : bool, default: False
Whether to save the history data to disk in HDF5 format. If False, the history data will be stored in memory.
disk_folder_address : str, optional
Folder path for saving history data when ``record_to_disk`` is True.
npz_filename : str, optional
Name of the .npz file to save the simulation results to when ``record_to_disk`` is True.
h5_filename : str, optional
Name of the .h5 file to save the simulation results to when ``record_to_disk`` is True.
record_max_steps : int, optional
Upper bound for the on-disk history length when ``record_to_disk`` is
True. If ``None``, the HDF5 datasets grow dynamically.
record_every_steps : int, default: 1
Number of steps to record data every ``record_every_steps`` steps.
record_list : list of str, optional
List of specific data fields to record. Possible values include "time", "qc", "pc", "drive", "energy", "effective_efield", "molecule_response", "molecule_dipole". If "all", all fields will be recorded. If None, none will be recorded.
"""
if (until is None) == (steps is None):
raise ValueError("Specify exactly one of 'until' or 'steps'.")
if until is not None:
if until < self.time:
return
steps = int(np.ceil((until - self.time) / self.dt))
self.storage_initialization(
steps=steps,
record_history=record_history,
record_to_disk=record_to_disk,
disk_folder_address=disk_folder_address,
npz_filename=npz_filename,
h5_filename=h5_filename,
record_max_steps=record_max_steps,
record_every_steps=record_every_steps,
record_list=record_list,
)
start_time = time.perf_counter()
previous_time = start_time
for idx in range(int(steps)):
if self.record_history:
if self.record_every_steps >= 2:
if idx % self.record_every_steps == 0:
self.step(savedata=True, step_idx=idx)
else:
self.step(savedata=False, step_idx=idx)
else:
self.step(savedata=True, step_idx=idx)
else:
self.step(savedata=False, step_idx=idx)
if (idx + 1) % 1000 == 0:
current_time = time.perf_counter()
avg_time_per_step = (current_time - previous_time) / 1000.0
previous_time = current_time
elapsed = current_time - start_time
remaining = (elapsed / (idx + 1)) * (steps - (idx + 1))
print(
f"[MultiModeCavity] Completed {idx + 1}/{steps} [{(idx + 1) / steps * 100:.1f}%] steps, time/step: {avg_time_per_step:.2e} seconds, remaining time: {remaining:.2f} seconds."
)
self.storage_finalization()
# close the hub
if self.hub is not None:
if am_master():
self.hub.stop()