Skip to content

amber

deepdrivewe.simulation.amber

Run Amber simulations and analyze the results using cpptraj.

AmberConfig

Bases: BaseModel

Config for an Amber simulation.

Source code in deepdrivewe/simulation/amber.py
class AmberConfig(BaseModel):
    """Config for an Amber simulation."""

    amber_exe: str = Field(
        default='sander',
        description='The path to the Amber executable.',
    )
    input_file: Path = Field(
        description='The input file for the Amber simulation.',
    )
    top_file: Path = Field(
        description='The prmtop file for the Amber simulation.',
    )

    @field_validator('input_file', 'top_file')
    @classmethod
    def validate_and_resolve_file(cls, value: Path | None) -> Path | None:
        """Validate and resolve the file path."""
        return validate_and_resolve_file(value)

validate_and_resolve_file classmethod

validate_and_resolve_file(
    value: Path | None,
) -> Path | None

Validate and resolve the file path.

Source code in deepdrivewe/simulation/amber.py
@field_validator('input_file', 'top_file')
@classmethod
def validate_and_resolve_file(cls, value: Path | None) -> Path | None:
    """Validate and resolve the file path."""
    return validate_and_resolve_file(value)

AmberSimulation

Bases: BaseModel

Run an Amber simulation.

Source code in deepdrivewe/simulation/amber.py
class AmberSimulation(BaseModel):
    """Run an Amber simulation."""

    amber_exe: str = Field(
        description='The path to the Amber executable.',
    )
    input_file: Path = Field(
        description='The input file for the Amber simulation.',
    )
    top_file: Path = Field(
        description='The prmtop file for the Amber simulation.',
    )
    output_dir: Path = Field(
        description='The output directory for the Amber simulation.',
    )
    checkpoint_file: Path = Field(
        description='The checkpoint file for the Amber simulation.',
    )
    copy_input_files: bool = Field(
        default=True,
        description='Whether to copy the input files to the output directory.'
        'input_file and top_file will be copied by default.',
    )

    @property
    def trajectory_file(self) -> Path:
        """The trajectory file for the simulation."""
        return self.output_dir / 'seg.nc'

    @property
    def restart_file(self) -> Path:
        """The restart file for the simulation."""
        return self.output_dir / f'seg{self.checkpoint_file.suffix}'

    @property
    def parent_file(self) -> Path:
        """The checkpoint file for the simulation."""
        return self.output_dir / f'parent{self.checkpoint_file.suffix}'

    @property
    def log_file(self) -> Path:
        """The log file for the simulation."""
        return self.output_dir / 'seg.log'

    @property
    def info_file(self) -> Path:
        """The metadata file for the simulation."""
        return self.output_dir / 'seg.nfo'

    def run(self) -> None:
        """Run the simulation.

        Implementation of the following bash command:
        $PMEMD -O -i md.in -p hmr.prmtop -c parent.ncrst \
            -r seg.ncrst -x seg.nc -o seg.log -inf seg.nfo
        """
        # Create the output directory
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Copy the restart checkpoint to the output directory
        shutil.copy(self.checkpoint_file, self.parent_file)

        # Copy the static input files to the output directory
        if self.copy_input_files:
            self.input_file = Path(
                shutil.copy(self.input_file, self.output_dir),
            )
            self.top_file = Path(shutil.copy(self.top_file, self.output_dir))

        # Create stderr log file (by default, stdout is captured
        # in the log file).
        stderr = self.output_dir / 'stderr.log'

        # Set the random seed
        seed = np.random.randint(0, 2**16)

        # Populate the input_file with the random seed
        command = f"sed -i 's/RAND/{seed}/g' {self.input_file}"
        with open(stderr, 'a') as err:
            subprocess.run(
                command,
                check=False,
                shell=True,
                stdout=err,
                stderr=err,
            )

        # Setup the simulation
        command = (
            f'{self.amber_exe} -O '
            f'-i {self.input_file} '
            f'-o {self.log_file} '
            f'-p {self.top_file} '
            f'-c {self.parent_file} '
            f'-r {self.restart_file} '
            f'-x {self.trajectory_file} '
            f'-inf {self.info_file}'
        )

        # Run the simulation
        with open(stderr, 'a') as err:
            subprocess.run(
                command,
                shell=True,
                check=True,
                cwd=self.output_dir,
                stdout=err,
                stderr=err,
            )

trajectory_file property

trajectory_file: Path

The trajectory file for the simulation.

restart_file property

restart_file: Path

The restart file for the simulation.

parent_file property

parent_file: Path

The checkpoint file for the simulation.

log_file property

log_file: Path

The log file for the simulation.

info_file property

info_file: Path

The metadata file for the simulation.

run

run() -> None

Run the simulation.

Implementation of the following bash command: $PMEMD -O -i md.in -p hmr.prmtop -c parent.ncrst -r seg.ncrst -x seg.nc -o seg.log -inf seg.nfo

Source code in deepdrivewe/simulation/amber.py
def run(self) -> None:
    """Run the simulation.

    Implementation of the following bash command:
    $PMEMD -O -i md.in -p hmr.prmtop -c parent.ncrst \
        -r seg.ncrst -x seg.nc -o seg.log -inf seg.nfo
    """
    # Create the output directory
    self.output_dir.mkdir(parents=True, exist_ok=True)

    # Copy the restart checkpoint to the output directory
    shutil.copy(self.checkpoint_file, self.parent_file)

    # Copy the static input files to the output directory
    if self.copy_input_files:
        self.input_file = Path(
            shutil.copy(self.input_file, self.output_dir),
        )
        self.top_file = Path(shutil.copy(self.top_file, self.output_dir))

    # Create stderr log file (by default, stdout is captured
    # in the log file).
    stderr = self.output_dir / 'stderr.log'

    # Set the random seed
    seed = np.random.randint(0, 2**16)

    # Populate the input_file with the random seed
    command = f"sed -i 's/RAND/{seed}/g' {self.input_file}"
    with open(stderr, 'a') as err:
        subprocess.run(
            command,
            check=False,
            shell=True,
            stdout=err,
            stderr=err,
        )

    # Setup the simulation
    command = (
        f'{self.amber_exe} -O '
        f'-i {self.input_file} '
        f'-o {self.log_file} '
        f'-p {self.top_file} '
        f'-c {self.parent_file} '
        f'-r {self.restart_file} '
        f'-x {self.trajectory_file} '
        f'-inf {self.info_file}'
    )

    # Run the simulation
    with open(stderr, 'a') as err:
        subprocess.run(
            command,
            shell=True,
            check=True,
            cwd=self.output_dir,
            stdout=err,
            stderr=err,
        )

AmberTrajAnalyzer

Bases: BaseModel, ABC

Strategy for analyzing Amber trajectories.

Source code in deepdrivewe/simulation/amber.py
class AmberTrajAnalyzer(BaseModel, ABC):
    """Strategy for analyzing Amber trajectories."""

    reference_file: Path = Field(
        description='The reference PDB file for the cpptraj analysis.',
    )

    @abstractmethod
    def get_pcoords(self, sim: AmberSimulation) -> np.ndarray:
        """Get the progress coordinate from the aligned trajectory.

        Parameters
        ----------
        sim : AmberSimulation
            The Amber simulation to analyze.

        Returns
        -------
        np.ndarray
            The progress coordinate from the aligned trajectory.
        """
        ...

    def get_coords(self, sim: AmberSimulation) -> np.ndarray:
        """Get the atomic coordinates from the aligned trajectory.

        Parameters
        ----------
        sim : AmberSimulation
            The Amber simulation to analyze.

        Returns
        -------
        np.ndarray
            The atomic coordinates from the aligned trajectory.
        """
        print(
            f'Analyzing simulation traj file {sim.trajectory_file} and'
            f' top file {sim.top_file}',
        )
        # Load the trajectory using mdtraj
        traj = mdtraj.load(sim.trajectory_file, top=sim.top_file)

        # Load the reference structure
        ref_traj = mdtraj.load(self.reference_file, top=sim.top_file)

        # Align the trajectory to the reference structure
        traj_aligned = traj.superpose(ref_traj)

        # Get the atomic coordinates from the aligned trajectory
        aligned_coordinates = traj_aligned.xyz

        return aligned_coordinates

get_pcoords abstractmethod

get_pcoords(sim: AmberSimulation) -> np.ndarray

Get the progress coordinate from the aligned trajectory.

Parameters:

Name Type Description Default
sim AmberSimulation

The Amber simulation to analyze.

required

Returns:

Type Description
ndarray

The progress coordinate from the aligned trajectory.

Source code in deepdrivewe/simulation/amber.py
@abstractmethod
def get_pcoords(self, sim: AmberSimulation) -> np.ndarray:
    """Get the progress coordinate from the aligned trajectory.

    Parameters
    ----------
    sim : AmberSimulation
        The Amber simulation to analyze.

    Returns
    -------
    np.ndarray
        The progress coordinate from the aligned trajectory.
    """
    ...

get_coords

get_coords(sim: AmberSimulation) -> np.ndarray

Get the atomic coordinates from the aligned trajectory.

Parameters:

Name Type Description Default
sim AmberSimulation

The Amber simulation to analyze.

required

Returns:

Type Description
ndarray

The atomic coordinates from the aligned trajectory.

Source code in deepdrivewe/simulation/amber.py
def get_coords(self, sim: AmberSimulation) -> np.ndarray:
    """Get the atomic coordinates from the aligned trajectory.

    Parameters
    ----------
    sim : AmberSimulation
        The Amber simulation to analyze.

    Returns
    -------
    np.ndarray
        The atomic coordinates from the aligned trajectory.
    """
    print(
        f'Analyzing simulation traj file {sim.trajectory_file} and'
        f' top file {sim.top_file}',
    )
    # Load the trajectory using mdtraj
    traj = mdtraj.load(sim.trajectory_file, top=sim.top_file)

    # Load the reference structure
    ref_traj = mdtraj.load(self.reference_file, top=sim.top_file)

    # Align the trajectory to the reference structure
    traj_aligned = traj.superpose(ref_traj)

    # Get the atomic coordinates from the aligned trajectory
    aligned_coordinates = traj_aligned.xyz

    return aligned_coordinates

run_cpptraj

run_cpptraj(
    command: str, verbose: bool = False
) -> list[float]

Run cpptraj with the command and return the progress coordinate.

Parameters:

Name Type Description Default
command str

The cpptraj command instructions to run (these get written to a cpptraj input file).

required
verbose bool

Whether to print the stdout and stderr of the cpptraj command (by default False).

False

Returns:

Type Description
list[float]

The progress coordinate from the cpptraj output.

Source code in deepdrivewe/simulation/amber.py
def run_cpptraj(command: str, verbose: bool = False) -> list[float]:
    """Run cpptraj with the command and return the progress coordinate.

    Parameters
    ----------
    command : str
        The cpptraj command instructions to run (these get written to a
        cpptraj input file).
    verbose : bool
        Whether to print the stdout and stderr of the cpptraj command
        (by default False).

    Returns
    -------
    list[float]
        The progress coordinate from the cpptraj output.
    """
    # Make a temporary directory to store the cpptraj inputs and outputs
    with tempfile.TemporaryDirectory() as tmp:
        # Create the cpptraj output file
        output_file = Path(tmp) / 'cpptraj.dat'
        # Format the cpptraj input file contents
        command = command.format(output_file=output_file)

        # Write the cpptraj input file to a temporary file
        input_file = Path(tmp) / 'cpptraj.in'
        input_file.write_text(command)

        # Capture the stdout and stderr
        stdout = Path(tmp) / 'stdout.log'
        stderr = Path(tmp) / 'stderr.log'

        # Run cpptraj
        _command = f'cat {input_file} | cpptraj'

        # Run the command and capture the output
        with open(stdout, 'a') as out, open(stderr, 'a') as err:
            result = subprocess.run(
                _command,
                shell=True,
                # Do not raise an exception on a non-zero return code
                check=False,
                stdout=out,
                stderr=err,
            )

        # Check the return code
        if result.returncode != 0:
            print(
                f'Command: {_command}\nfailed '
                f'with return code {result.returncode}.',
            )

        # Print the stdout and stderr
        if verbose or result.returncode != 0:
            with open(stdout) as out, open(stderr) as err:
                print(f'{out.read()}\n\n{err.read()}')

        # Parse the cpptraj output file (first line is a header)
        lines = Path(output_file).read_text().splitlines()[1:]
        # The second column is the progress coordinate
        pcoord = [float(line.split()[1]) for line in lines if line]

    return pcoord