# ase_md.py
# Standard library imports
import subprocess
################################################################
# Third party imports
import ase.units
import numpy as np
from ase import Atoms
from ase.md import MDLogger
from ase.md.langevin import Langevin
from ase.md.nose_hoover_chain import NoseHooverChainNVT
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from ase.md.verlet import VelocityVerlet
################################################################
# Local imports
from sparc.src.utils.logger import SparcLog
from sparc.src.utils.utils import (
check_physical_limits,
load_checkpoint,
log_md_setup,
save_checkpoint,
save_xyz,
)
################################################################
# ===================================================================================================#
# Helper Functions
# ===================================================================================================#
def initialize_dynamics(atoms, dyn_class, timestep, temperature, restart, **kwargs):
"""
Initialize MD dynamics, optionally restarting from a checkpoint.
Parameters
----------
atoms : ase.Atoms
The ASE Atoms object representing the system.
dyn_class : type
The MD dynamics class to use (e.g., NoseHooverChainNVT, Langevin).
timestep : float
The simulation time step in femtoseconds.
temperature : float
The target temperature in Kelvin.
restart : bool
If True, the simulation will be restarted from a checkpoint.
**kwargs : dict
Additional keyword arguments. For example, 'checkpoint_file' (default is 'md_checkpoint.pkl').
Returns
-------
dyn : instance of dyn_class
The initialized dynamics object.
"""
checkpoint_file = kwargs.get("checkpoint_file", "md_checkpoint.pkl")
if restart:
atoms, mdstep = load_checkpoint(atoms, checkpoint_file)
dyn = dyn_class(atoms, timestep=timestep, temperature_K=temperature, **kwargs)
dyn.nsteps = mdstep
else:
MaxwellBoltzmannDistribution(atoms, temperature_K=temperature, force_temp=True)
dyn = dyn_class(atoms, timestep=timestep, temperature_K=temperature, **kwargs)
return dyn
# ===================================================================================================#
# Thermostat Functions
# ===================================================================================================#
[docs]
def NoseNVT(atoms, timestep=1, temperature=300, tdamp=10, restart=False):
"""
Set up a Nose-Hoover chain NVT thermostat for MD simulation.
Parameters
----------
atoms : ase.Atoms
The ASE Atoms object representing the system.
timestep : float, optional
The simulation time step in femtoseconds (default is 1 fs).
temperature : float, optional
The target temperature in Kelvin (default is 300 K).
tdamp : float, optional
The damping time for the thermostat in femtoseconds (default is 10 fs).
restart : bool, optional
If True, the simulation will be restarted from a checkpoint.
Returns
-------
dynamics : NoseHooverChainNVT
The initialized dynamics object using the Nose-Hoover chain thermostat.
"""
return initialize_dynamics(
atoms,
NoseHooverChainNVT,
timestep * ase.units.fs,
temperature,
restart,
tdamp=tdamp * ase.units.fs,
)
[docs]
def LangevinNVT(atoms, timestep=1, temperature=300, friction=0.01, restart=False):
"""
Set up a Langevin thermostat for NVT MD simulation.
Parameters
----------
atoms : ase.Atoms
The ASE Atoms object representing the system.
timestep : float, optional
The simulation time step in femtoseconds (default is 1 fs).
temperature : float, optional
The target temperature in Kelvin (default is 300 K).
friction : float, optional
The friction coefficient for the Langevin thermostat (default is 0.01 fs^-1).
restart : bool, optional
If True, the simulation will be restarted from a checkpoint.
Returns
-------
dynamics : Langevin
The initialized dynamics object using the Langevin thermostat.
"""
return initialize_dynamics(
atoms,
Langevin,
timestep * ase.units.fs,
temperature,
restart,
friction=friction / ase.units.fs,
)
# ===================================================================================================#
# MD Execution Functions
# ===================================================================================================#
[docs]
def ExecuteAbInitioDynamics(
system, dyn, steps, pace, log_filename, trajfile, dir_name, name
):
"""
Run an ab initio MD simulation.
Parameters
----------
system : ase.Atoms
The ASE Atoms object representing the system.
dyn : dynamics object
The initialized MD dynamics object.
steps : int
The number of MD steps to run.
pace : int
The interval (in steps) at which to log data and save checkpoints.
log_filename : str
The filename for the MD log file.
trajfile : str
The filename for the trajectory file.
dir_name : str
The directory where log and trajectory files will be saved.
name : str
A label for the simulation (e.g., the thermostat type).
Returns
-------
None
"""
if steps <= 0:
return
SparcLog("=" * 72 + "\n")
SparcLog(f"Starting AIMD Simulation [{name}]".center(72) + "\n")
SparcLog("=" * 72 + "\n")
dyn.attach(lambda: save_checkpoint(dyn, system), interval=pace)
dyn.attach(lambda: log_md_setup(dyn, system, dir_name), interval=pace)
dyn.attach(lambda: save_xyz(system, trajfile, "a", dir_name), interval=pace)
logger = MDLogger(
dyn=dyn,
atoms=system,
logfile=f"{dir_name}/{log_filename}",
header=True,
stress=False,
peratom=False,
mode="a",
)
dyn.attach(logger, interval=pace)
dyn.run(steps)
def ExecuteMlpDynamics(
system,
dyn,
steps,
pace,
log_filename,
trajfile,
dir_name,
distance_metrics,
name,
epot_threshold,
):
"""
Run a Deep Potential MD simulation.
Parameters
----------
system : ase.Atoms
The ASE Atoms object representing the system.
dyn : dynamics object
The initialized MD dynamics object.
steps : int
The number of MD steps to run.
pace : int
The interval (in steps) at which to log data.
log_filename : str
The filename for the MD log file.
trajfile : str
The filename for the trajectory file.
dir_name : str
The directory where log and trajectory files will be saved.
distance_metrics : dict or list
Metrics used to check physical limits during the simulation.
name : str
A label for the simulation (e.g., the thermostat type).
Returns
-------
None
"""
SparcLog("=" * 72 + "\n")
SparcLog(f"Initializing DeepPotential MD Simulation [{name}]".center(72) + "\n")
SparcLog("=" * 72 + "\n")
dyn.attach(lambda: log_md_setup(dyn, system, dir_name), interval=pace)
dyn.attach(lambda: save_xyz(system, trajfile, "a", dir_name), interval=pace)
logger = MDLogger(
dyn=dyn,
atoms=system,
logfile=f"{dir_name}/{log_filename}",
header=True,
stress=False,
peratom=False,
mode="a",
)
dyn.attach(logger, interval=pace)
# Store reference energy for comparison
epot_ref = None
for step in range(steps):
dyn.run(1)
# Check if physical limits are exceeded
if distance_metrics and check_physical_limits(system, distance_metrics):
SparcLog(
"Physical limits exceeded. Stopping MLMD simulation!!!", level="WARNING"
)
break
# Check if the Potential Energy becomes undefined
epot = np.array(system.get_potential_energy())
if np.isnan(epot):
SparcLog(
"Potential Energy is Nan! || Stopping MLMD simulation !!!\n",
level="ERROR",
)
break
# Store reference energy in the first step
if epot_ref is None:
epot_ref = epot
SparcLog(f"Reference Potential Energy//STEP:0 -> {epot_ref}")
# Check if the Potential Energy is too high
Llim = epot_ref - epot_threshold
Ulim = epot_ref + epot_threshold
if epot > Ulim or epot < Llim:
SparcLog(f"{f'[Iteration: {dir_name}]':^72}", level="ERROR")
SparcLog("Potential Energy Exceeded Limit:", level="ERROR")
SparcLog(f"Reference Energy : {float(epot_ref): .2f} eV", level="ERROR")
SparcLog(
f"Threshold Energy : {float(epot_threshold): .2f} eV", level="ERROR"
)
SparcLog(f"Lower_limit : {float(Llim): .2f} eV", level="ERROR")
SparcLog(f"Upper_limit : {float(Ulim): .2f} eV", level="ERROR")
SparcLog(f"Current Energy : {float(epot): .2f} eV", level="ERROR")
SparcLog("Stopping ML/MD Simulation!!!", level="ERROR")
SparcLog("-" * 72, level="ERROR")
break
def CalculateDFTEnergy(
idx, header, system, timestep, log_filename, dir_name, trajfile, pace=1
):
"""
Calculate the DFT energy and forces for a candidate structure.
Parameters
----------
idx : int
An identifier index for the candidate.
header : bool
If True, include a header in the log.
system : ase.Atoms
The ASE Atoms object representing the candidate structure.
timestep : float
The simulation time step in femtoseconds.
log_filename : str
The filename for the energy log.
dir_name : str
The directory where log and trajectory files will be saved.
trajfile : str
The filename for the trajectory file.
pace : int, optional
The logging interval (default is 1).
Returns
-------
None
"""
dyn = VelocityVerlet(system, timestep, trajectory=None)
dyn.attach(lambda: save_xyz(system, trajfile, "a", dir_name), interval=pace)
epot = system.get_potential_energy()
epot = epot if not isinstance(epot, (list, np.ndarray)) else epot[0]
SparcLog(f"Candidate: {idx:5d} | Epot: {epot:10.6f} [eV]\n")
log = MDLogger(
dyn=dyn,
atoms=system,
logfile=f"{dir_name}/{log_filename}",
header=header,
stress=False,
peratom=False,
mode="a",
)
dyn.attach(log, interval=pace)
dyn.run(0)
# ===================================================================================================#
# LAMMPS MD Execution
# ===================================================================================================#
def lammps_md(system, model_path, model_name):
"""
Run a LAMMPS MD simulation.
Parameters
----------
system : ase.Atoms
The ASE Atoms object representing the system (not used directly in the call).
model_path : str
The path to the model files required by LAMMPS.
model_name : str
The name of the model to be used in the simulation.
Returns
-------
None
"""
SparcLog("\n" + "=" * 72)
SparcLog("Starting LAMMPS MD Simulation".center(72))
SparcLog("=" * 72)
run_command = ["lmp", "-i", "in.lammps"]
try:
subprocess.run(run_command, check=True)
SparcLog("\n" + "=" * 72)
SparcLog("LAMMPS MD Simulation Completed Successfully".center(72))
SparcLog("=" * 72)
except subprocess.CalledProcessError as e:
SparcLog("\n" + "=" * 72)
SparcLog("Error in LAMMPS MD Simulation".center(72))
SparcLog(str(e).center(72))
SparcLog("=" * 72)
# ===================================================================================================#
# Standalone Demonstration
# ===================================================================================================#
if __name__ == "__main__":
"""
Standalone demonstration of the ase_md module.
This example creates a simple H2 molecule, sets up a Nose-Hoover NVT simulation,
and runs a short MD simulation for demonstration purposes. The MD log and trajectory
files are saved in the current directory.
"""
from ase import Atoms
# Create a simple diatomic molecule (Hâ‚‚)
atoms = Atoms("H2", positions=[[0, 0, 0], [0, 0, 0.74]])
atoms.set_pbc([False, False, False])
# Simulation parameters
timestep = 1 * ase.units.fs
temperature = 300 # Kelvin
steps = 10
pace = 1
# Display a message to the user
SparcLog("Running MD simulation with Nose-Hoover Thermostat...\n")
# Initialize dynamics using Nose-Hoover thermostat
dyn_test = NoseNVT(
atoms,
timestep=timestep,
temperature=temperature,
tdamp=10 * ase.units.fs,
restart=False,
)
# Run the simulation (log file: demo_md.log, trajectory: demo_traj.xyz, saved in current directory)
ExecuteAbInitioDynamics(
system=atoms,
dyn=dyn_test,
steps=steps,
pace=pace,
log_filename="md.log",
trajfile="traj.xyz",
dir_name=".",
name="Nose",
)
SparcLog("Simulation completed.\n")